Skip to content

Commit

Permalink
Merge branch 'master' into fix-trace
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 25, 2024
2 parents accfd18 + 19c54c8 commit e7072cc
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 122 deletions.
61 changes: 32 additions & 29 deletions pkg/id/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
package id

import (
"path"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type label string

const (
// DefaultLabel is the default label for id allocator.
DefaultLabel label = "idalloc"
// KeyspaceLabel is the label for keyspace id allocator.
KeyspaceLabel label = "keyspace-idAlloc"
)

// Allocator is the allocator to generate unique ID.
type Allocator interface {
// SetBase set base id
Expand All @@ -48,13 +56,11 @@ type allocatorImpl struct {
base uint64
end uint64

client *clientv3.Client
rootPath string
allocPath string
label string
member string
step uint64
metrics *metrics
client *clientv3.Client
label label
member string
step uint64
metrics *metrics
}

// metrics is a collection of idAllocator's metrics.
Expand All @@ -64,24 +70,20 @@ type metrics struct {

// AllocatorParams are parameters needed to create a new ID Allocator.
type AllocatorParams struct {
Client *clientv3.Client
RootPath string
AllocPath string // AllocPath specifies path to the persistent window boundary.
Label string // Label used to label metrics and logs.
Member string // Member value, used to check if current pd leader.
Step uint64 // Step size of each persistent window boundary increment, default 1000.
Client *clientv3.Client
Label label // Label used to label metrics and logs.
Member string // Member value, used to check if current pd leader.
Step uint64 // Step size of each persistent window boundary increment, default 1000.
}

// NewAllocator creates a new ID Allocator.
func NewAllocator(params *AllocatorParams) Allocator {
allocator := &allocatorImpl{
client: params.Client,
rootPath: params.RootPath,
allocPath: params.AllocPath,
label: params.Label,
member: params.Member,
step: params.Step,
metrics: &metrics{idGauge: idGauge.WithLabelValues(params.Label)},
client: params.Client,
label: params.Label,
member: params.Member,
step: params.Step,
metrics: &metrics{idGauge: idGauge.WithLabelValues(string(params.Label))},
}
if allocator.step == 0 {
allocator.step = defaultAllocStep
Expand Down Expand Up @@ -127,9 +129,14 @@ func (alloc *allocatorImpl) Rebase() error {
}

func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error {
key := alloc.getAllocIDPath()
var key string
if alloc.label == KeyspaceLabel {
key = keypath.KeyspaceAllocIDPath()
} else {
key = keypath.AllocIDPath()
}

leaderPath := path.Join(alloc.rootPath, "leader")
leaderPath := keypath.LeaderPath(nil)
var (
cmps = []clientv3.Cmp{clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member)}
end uint64
Expand Down Expand Up @@ -173,10 +180,6 @@ func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error {
// please do not reorder the first field, it's need when getting the new-end
// see: https://docs.pingcap.com/tidb/dev/pd-recover#get-allocated-id-from-pd-log
log.Info("idAllocator allocates a new id", zap.Uint64("new-end", end), zap.Uint64("new-base", alloc.base),
zap.String("label", alloc.label), zap.Bool("check-curr-end", checkCurrEnd))
zap.String("label", string(alloc.label)), zap.Bool("check-curr-end", checkCurrEnd))
return nil
}

func (alloc *allocatorImpl) getAllocIDPath() string {
return path.Join(alloc.rootPath, alloc.allocPath)
}
25 changes: 11 additions & 14 deletions pkg/id/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package id

import (
"context"
"strconv"
"sync"
"testing"

Expand All @@ -25,10 +24,7 @@ import (
)

const (
rootPath = "/pd"
leaderPath = "/pd/leader"
allocPath = "alloc_id"
label = "idalloc"
leaderPath = "/pd/0/leader"
memberVal = "member"
step = uint64(500)
)
Expand All @@ -44,24 +40,25 @@ func TestMultipleAllocator(t *testing.T) {
_, err := client.Put(context.Background(), leaderPath, memberVal)
re.NoError(err)

var i uint64
wg := sync.WaitGroup{}
for i := range 3 {
iStr := strconv.Itoa(i)
fn := func(label label) {
wg.Add(1)
// All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps.
// Different allocators have different labels and steps.
allocator := NewAllocator(&AllocatorParams{
Client: client,
RootPath: rootPath,
AllocPath: allocPath + iStr,
Label: label + iStr,
Member: memberVal,
Step: step * uint64(i), // allocator 0, 1, 2 should have step size 1000 (default), 500, 1000 respectively.
Client: client,
Label: label,
Member: memberVal,
Step: step * i, // allocator 0, 1 should have step size 1000 (default), 500 respectively.
})
go func(re *require.Assertions, allocator Allocator) {
defer wg.Done()
testAllocator(re, allocator)
}(re, allocator)
i++
}
fn(DefaultLabel)
fn(KeyspaceLabel)
wg.Wait()
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
// AllocStep set idAllocator's step when write persistent window boundary.
// Use a lower value for denser idAllocation in the event of frequent pd leader change.
AllocStep = uint64(100)
// AllocLabel is used to label keyspace idAllocator's metrics.
AllocLabel = "keyspace-idAlloc"
// regionLabelIDPrefix is used to prefix the keyspace region label.
regionLabelIDPrefix = "keyspaces/"
// regionLabelKey is the key for keyspace id in keyspace region label.
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,15 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.GetClient(), constant.SchedulingServiceName)
s.participant = member.NewParticipant(s.GetClient(), keypath.MsParam{
ServiceName: constant.SchedulingServiceName,
})
p := &schedulingpb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election")

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
40 changes: 14 additions & 26 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
)

const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
dcLocationConfigEtcdPrefix = "dc-location"
moveLeaderTimeout = 5 * time.Second
// If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again.
campaignLeaderFrequencyTimes = 3
)
Expand Down Expand Up @@ -160,8 +160,8 @@ func (m *EmbeddedEtcdMember) EnableLeader() {
}

// GetLeaderPath returns the path of the PD leader.
func (m *EmbeddedEtcdMember) GetLeaderPath() string {
return path.Join(m.rootPath, "leader")
func (*EmbeddedEtcdMember) GetLeaderPath() string {
return keypath.LeaderPath(nil)
}

// GetLeadership returns the leadership of the PD member.
Expand Down Expand Up @@ -384,13 +384,13 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string {
}

// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
func (m *EmbeddedEtcdMember) GetDCLocationPathPrefix() string {
return path.Join(m.rootPath, dcLocationConfigEtcdPrefix)
func (*EmbeddedEtcdMember) GetDCLocationPathPrefix() string {
return keypath.Prefix(keypath.DCLocationPath(nil, 0))
}

// GetDCLocationPath returns the dc-location path of a member with the given member ID.
func (m *EmbeddedEtcdMember) GetDCLocationPath(id uint64) string {
return path.Join(m.GetDCLocationPathPrefix(), fmt.Sprint(id))
func (*EmbeddedEtcdMember) GetDCLocationPath(id uint64) string {
return keypath.DCLocationPath(nil, id)
}

// SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader.
Expand Down Expand Up @@ -452,13 +452,9 @@ func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) {
return int(priority), nil
}

func (m *EmbeddedEtcdMember) getMemberBinaryDeployPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/deploy_path", id))
}

// GetMemberDeployPath loads a member's binary deploy path.
func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) {
key := m.getMemberBinaryDeployPath(id)
key := keypath.MemberBinaryDeployPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return "", err
Expand All @@ -471,7 +467,7 @@ func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) {

// SetMemberDeployPath saves a member's binary deploy path.
func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error {
key := m.getMemberBinaryDeployPath(id)
key := keypath.MemberBinaryDeployPath(id)
txn := kv.NewSlowLogTxn(m.client)
execPath, err := os.Executable()
deployPath := filepath.Dir(execPath)
Expand All @@ -488,17 +484,9 @@ func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error {
return nil
}

func (m *EmbeddedEtcdMember) getMemberGitHashPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/git_hash", id))
}

func (m *EmbeddedEtcdMember) getMemberBinaryVersionPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/binary_version", id))
}

// GetMemberBinaryVersion loads a member's binary version.
func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) {
key := m.getMemberBinaryVersionPath(id)
key := keypath.MemberBinaryVersionPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return "", err
Expand All @@ -511,7 +499,7 @@ func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) {

// GetMemberGitHash loads a member's git hash.
func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) {
key := m.getMemberGitHashPath(id)
key := keypath.MemberGitHashPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return "", err
Expand All @@ -524,7 +512,7 @@ func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) {

// SetMemberBinaryVersion saves a member's binary version.
func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion string) error {
key := m.getMemberBinaryVersionPath(id)
key := keypath.MemberBinaryVersionPath(id)
txn := kv.NewSlowLogTxn(m.client)
res, err := txn.Then(clientv3.OpPut(key, releaseVersion)).Commit()
if err != nil {
Expand All @@ -538,7 +526,7 @@ func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion st

// SetMemberGitHash saves a member's git hash.
func (m *EmbeddedEtcdMember) SetMemberGitHash(id uint64, gitHash string) error {
key := m.getMemberGitHashPath(id)
key := keypath.MemberGitHashPath(id)
txn := kv.NewSlowLogTxn(m.client)
res, err := txn.Then(clientv3.OpPut(key, gitHash)).Commit()
if err != nil {
Expand Down
Loading

0 comments on commit e7072cc

Please sign in to comment.