Skip to content

Commit

Permalink
Merge branch 'master' into simulator_with_multiple_pds
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jun 19, 2024
2 parents ec7c588 + fbd6bd2 commit d140be6
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|plugin/scheduler_example/evict_leader.go|server/api/.*\.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/schedule/schedulers/.*\.go|pkg/schedule/placement/rule.go|pkg/tso/allocator_manager.go|pkg/core/store_stats.go|pkg/core/store_stats.go|pkg/storage/hot_region_storage.go|pkg/syncer/server.go)
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|server/api/.*\.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/schedule/schedulers/.*\.go|pkg/syncer/server.go)
linters:
- errcheck
27 changes: 27 additions & 0 deletions client/http/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,30 @@ func mustMarshalAndUnmarshalRuleOp(re *require.Assertions, ruleOp *RuleOp) *Rule
re.NoError(err)
return newRuleOp
}

// startKey and endKey are json:"-" which means cannot be Unmarshal from json
// We need to take care of `Clone` method.
func TestRuleKeyClone(t *testing.T) {
re := require.New(t)
r := &Rule{
StartKey: []byte{1, 2, 3},
EndKey: []byte{4, 5, 6},
}

clone := r.Clone()
// Modify the original rule
r.StartKey[0] = 9
r.EndKey[0] = 9

// The clone should not be affected
re.Equal([]byte{1, 2, 3}, clone.StartKey)
re.Equal([]byte{4, 5, 6}, clone.EndKey)

// Modify the clone
clone.StartKey[0] = 8
clone.EndKey[0] = 8

// The original rule should not be affected
re.Equal([]byte{9, 2, 3}, r.StartKey)
re.Equal([]byte{9, 5, 6}, r.EndKey)
}
5 changes: 2 additions & 3 deletions pkg/core/store_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

type storeStats struct {
Expand Down Expand Up @@ -56,10 +57,8 @@ func (ss *storeStats) GetStoreStats() *pdpb.StoreStats {
// CloneStoreStats returns the statistics information cloned from the store.
func (ss *storeStats) CloneStoreStats() *pdpb.StoreStats {
ss.mu.RLock()
b, _ := ss.rawStats.Marshal()
stats := typeutil.DeepClone(ss.rawStats, StoreStatsFactory)
ss.mu.RUnlock()
stats := &pdpb.StoreStats{}
stats.Unmarshal(b)
return stats
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *Rule) String() string {
// Clone returns a copy of Rule.
func (r *Rule) Clone() *Rule {
var clone Rule
json.Unmarshal([]byte(r.String()), &clone)
_ = json.Unmarshal([]byte(r.String()), &clone)
clone.StartKey = append(r.StartKey[:0:0], r.StartKey...)
clone.EndKey = append(r.EndKey[:0:0], r.EndKey...)
return &clone
Expand Down
27 changes: 27 additions & 0 deletions pkg/schedule/placement/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,30 @@ func TestBuildRuleList(t *testing.T) {
re.Equal(testCase.expect.ranges, result.ranges)
}
}

// startKey and endKey are json:"-" which means cannot be Unmarshal from json
// We need to take care of `Clone` method.
func TestRuleKeyClone(t *testing.T) {
re := require.New(t)
r := &Rule{
StartKey: []byte{1, 2, 3},
EndKey: []byte{4, 5, 6},
}

clone := r.Clone()
// Modify the original rule
r.StartKey[0] = 9
r.EndKey[0] = 9

// The clone should not be affected
re.Equal([]byte{1, 2, 3}, clone.StartKey)
re.Equal([]byte{4, 5, 6}, clone.EndKey)

// Modify the clone
clone.StartKey[0] = 8
clone.EndKey[0] = 8

// The original rule should not be affected
re.Equal([]byte{9, 2, 3}, r.StartKey)
re.Equal([]byte{9, 5, 6}, r.EndKey)
}
4 changes: 3 additions & 1 deletion pkg/storage/hot_region_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ func (h *HotRegionStorage) backgroundDelete() {
there may be residual hot regions, you can remove it manually, [pd-dir]/data/hot-region.`)
continue
}
h.delete(int(curReservedDays))
if err := h.delete(int(curReservedDays)); err != nil {
log.Error("delete hot region meet error", errs.ZapError(err))
}
case <-h.hotRegionInfoCtx.Done():
return
}
Expand Down
64 changes: 15 additions & 49 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,13 @@ func (am *AllocatorManager) campaignAllocatorLeader(
dcLocationInfo *pdpb.GetDCLocationInfoResponse,
isNextLeader bool,
) {
log.Info("start to campaign local tso allocator leader",
logger := log.With(
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
zap.String("name", am.member.Name()),
)
logger.Info("start to campaign local tso allocator leader")
cmps := make([]clientv3.Cmp, 0)
nextLeaderKey := am.nextLeaderKey(allocator.GetDCLocation())
if !isNextLeader {
Expand All @@ -648,18 +650,9 @@ func (am *AllocatorManager) campaignAllocatorLeader(
})
if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
logger.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully")
} else {
log.Error("failed to campaign local tso allocator leader due to etcd error",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()),
errs.ZapError(err))
logger.Error("failed to campaign local tso allocator leader due to etcd error", errs.ZapError(err))
}
return
}
Expand All @@ -670,44 +663,25 @@ func (am *AllocatorManager) campaignAllocatorLeader(
defer am.ResetAllocatorGroup(allocator.GetDCLocation())
// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)
log.Info("campaign local tso allocator leader ok",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))

log.Info("initialize the local TSO allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
logger.Info("Complete campaign local tso allocator leader, begin to initialize the local TSO allocator")
if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil {
log.Error("failed to initialize the local TSO allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
log.Error("failed to initialize the local TSO allocator", errs.ZapError(err))
return
}
if dcLocationInfo.GetMaxTs().GetPhysical() != 0 {
if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil {
log.Error("failed to write the max local TSO after member changed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
log.Error("failed to write the max local TSO after member changed", errs.ZapError(err))
return
}
}
am.compareAndSetMaxSuffix(dcLocationInfo.Suffix)
allocator.EnableAllocatorLeader()
// The next leader is me, delete it to finish campaigning
am.deleteNextLeaderID(allocator.GetDCLocation())
log.Info("local tso allocator leader is ready to serve",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
if err := am.deleteNextLeaderID(allocator.GetDCLocation()); err != nil {
logger.Warn("failed to delete next leader key after campaign local tso allocator leader", errs.ZapError(err))
}
logger.Info("local tso allocator leader is ready to serve")

leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval)
defer leaderTicker.Stop()
Expand All @@ -716,20 +690,12 @@ func (am *AllocatorManager) campaignAllocatorLeader(
select {
case <-leaderTicker.C:
if !allocator.IsAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
logger.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down")
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed, reset the local tso allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
logger.Info("server is closed, reset the local tso allocator")
return
}
}
Expand Down
46 changes: 27 additions & 19 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
id = (uint64)(idFloat)
if _, exists = handler.config.StoreIDWitRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
Expand All @@ -273,47 +273,55 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
args = append(args, handler.config.getRanges(id)...)
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
err := handler.config.BuildWithArgs(args)
if err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
handler.rd.JSON(w, http.StatusOK, nil)
err = handler.config.Persist()
if err != nil {
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
_ = handler.rd.JSON(w, http.StatusOK, nil)
}

func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["store_id"]
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
_ = handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

handler.config.mu.Lock()
defer handler.config.mu.Unlock()
_, exists := handler.config.StoreIDWitRanges[id]
if exists {
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id)
if !exists {
_ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist"))
return
}
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id)

handler.config.mu.Unlock()
handler.config.Persist()
handler.config.mu.Unlock()
if err := handler.config.Persist(); err != nil {
handler.config.mu.Lock()

var resp any
if len(handler.config.StoreIDWitRanges) == 0 {
resp = noStoreInSchedulerInfo
}
handler.rd.JSON(w, http.StatusOK, resp)
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.config.mu.Lock()

handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist"))
var resp any
if len(handler.config.StoreIDWitRanges) == 0 {
resp = noStoreInSchedulerInfo
}
_ = handler.rd.JSON(w, http.StatusOK, resp)
}

func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler {
Expand Down

0 comments on commit d140be6

Please sign in to comment.