Skip to content

Commit

Permalink
Merge branch 'master' into add-rule-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Oct 20, 2023
2 parents 6aac92e + 43551e2 commit 30431fc
Show file tree
Hide file tree
Showing 24 changed files with 364 additions and 100 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA=
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk=
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
17 changes: 17 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,23 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 {
return r.tree.TotalSize() / int64(r.tree.length())
}

// ValidRegion is used to decide if the region is valid.
func (r *RegionsInfo) ValidRegion(region *metapb.Region) error {
startKey := region.GetStartKey()
currnetRegion := r.GetRegionByKey(startKey)
if currnetRegion == nil {
return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region)))
}
// If the request epoch is less than current region epoch, then returns an error.
regionEpoch := region.GetRegionEpoch()
currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch()
if regionEpoch.GetVersion() < currnetEpoch.GetVersion() ||
regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch)
}
return nil
}

// DiffRegionPeersInfo return the difference of peers info between two RegionInfo
func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string {
var ret []string
Expand Down
23 changes: 18 additions & 5 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
return v.(chan<- struct{})
}

func (o *PersistConfig) tryNotifySchedulersUpdating() {
notifier := o.getSchedulersUpdatingNotifier()
if notifier == nil {
return
}
notifier <- struct{}{}
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig {
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes, however, it
// should react on the scheduler number changes to handle the add/remove scheduler events.
if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil &&
len(old.Schedulers) != len(cfg.Schedulers) {
notifier <- struct{}{}
// The coordinator is not aware of the underlying scheduler config changes,
// we should notify it to update the schedulers proactively.
if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) {
o.tryNotifySchedulersUpdating()
}
}

Expand Down Expand Up @@ -650,6 +658,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
return o.GetStoreConfig().IsRaftKV2()
}

// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
return o.GetScheduleConfig().EnableTiKVSplitRegion
}

// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
Expand Down
71 changes: 71 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -254,6 +257,74 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper
}, nil
}

// AskBatchSplit implements gRPC PDServer.
func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

if request.GetRegion() == nil {
return &schedulingpb.AskBatchSplitResponse{
Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN,
"missing region for split"),
}, nil
}

if c.persistConfig.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.persistConfig.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
}
reqRegion := request.GetRegion()
splitCount := request.GetSplitCount()
err := c.ValidRegion(reqRegion)
if err != nil {
return nil, err
}
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
recordRegions := make([]uint64, 0, splitCount+1)

for i := 0; i < int(splitCount); i++ {
newRegionID, err := c.AllocID()
if err != nil {
return nil, errs.ErrSchedulerNotFound.FastGenByArgs()
}

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.AllocID(); err != nil {
return nil, err
}
}

recordRegions = append(recordRegions, newRegionID)
splitIDs = append(splitIDs, &pdpb.SplitID{
NewRegionId: newRegionID,
NewPeerIds: peerIDs,
})

log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs))
}

recordRegions = append(recordRegions, reqRegion.GetId())
if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) {
// Disable merge the regions in a period of time.
c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions)
}

// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...)

return &schedulingpb.AskBatchSplitResponse{
Header: s.header(),
Ids: splitIDs,
}, nil
}

// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
schedulingpb.RegisterSchedulingServer(g, s)
Expand Down
21 changes: 15 additions & 6 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
} else if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
} else {
log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}
}

Expand All @@ -484,17 +487,23 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
continue
}

log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
} else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
log.Info("create scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
}
}

Expand Down
97 changes: 88 additions & 9 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
package schedulers

import (
"net/http"
"sync/atomic"
"time"

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand All @@ -23,6 +28,8 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)

Expand All @@ -40,8 +47,27 @@ const (
var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule")

type evictSlowStoreSchedulerConfig struct {
storage endpoint.ConfigStorage
EvictedStores []uint64 `json:"evict-stores"`
storage endpoint.ConfigStorage
// Last timestamp of the chosen slow store for eviction.
lastSlowStoreCaptureTS time.Time
// Duration gap for recovering the candidate, unit: s.
RecoveryDurationGap uint64 `json:"recovery-duration"`
EvictedStores []uint64 `json:"evict-stores"`
}

func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
storage: storage,
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
}
}

func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap),
}
}

func (conf *evictSlowStoreSchedulerConfig) Persist() error {
Expand Down Expand Up @@ -78,23 +104,77 @@ func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 {
return conf.EvictedStores[0]
}

// readyForRecovery checks whether the last cpatured candidate is ready for recovery.
func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool {
recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap)
failpoint.Inject("transientRecoveryGap", func() {
recoveryDurationGap = 0
})
return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap
}

func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
conf.EvictedStores = []uint64{id}
conf.lastSlowStoreCaptureTS = time.Now()
return conf.Persist()
}

func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) {
oldID = conf.evictStore()
if oldID > 0 {
conf.EvictedStores = []uint64{}
conf.lastSlowStoreCaptureTS = time.Time{}
err = conf.Persist()
}
return
}

type evictSlowStoreHandler struct {
rd *render.Render
config *evictSlowStoreSchedulerConfig
}

func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler {
h := &evictSlowStoreHandler{
config: config,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet)
return router
}

func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil {
return
}
recoveryDurationGapFloat, ok := input["recovery-duration"].(float64)
if !ok {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
return
}
recoveryDurationGap := (uint64)(recoveryDurationGapFloat)
prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap)
atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap)
log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap))
handler.rd.JSON(w, http.StatusOK, nil)
}

func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

type evictSlowStoreScheduler struct {
*BaseScheduler
conf *evictSlowStoreSchedulerConfig
conf *evictSlowStoreSchedulerConfig
handler http.Handler
}

func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

func (s *evictSlowStoreScheduler) GetName() string {
Expand Down Expand Up @@ -168,7 +248,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
// slow node next time.
log.Info("slow store has been removed",
zap.Uint64("store-id", store.GetID()))
} else if store.GetSlowScore() <= slowStoreRecoverThreshold {
} else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() {
log.Info("slow store has been recovered",
zap.Uint64("store-id", store.GetID()))
} else {
Expand Down Expand Up @@ -211,11 +291,10 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun

// newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores.
func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler {
base := NewBaseScheduler(opController)

s := &evictSlowStoreScheduler{
BaseScheduler: base,
handler := newEvictSlowStoreHandler(conf)
return &evictSlowStoreScheduler{
BaseScheduler: NewBaseScheduler(opController),
conf: conf,
handler: handler,
}
return s
}
Loading

0 comments on commit 30431fc

Please sign in to comment.