Skip to content

Commit

Permalink
transfer primary
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 8, 2024
1 parent a3c5950 commit 0ee6a64
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 50 deletions.
4 changes: 2 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (ls *Leadership) getLease() *lease {
return l.(*lease)
}

func (ls *Leadership) setLease(lease *lease) {
func (ls *Leadership) SetLease(lease *lease) {
ls.lease.Store(lease)
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
client: ls.client,
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)
ls.SetLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
var member pdpb.Member
Expand Down
8 changes: 8 additions & 0 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ type lease struct {
expireTime atomic.Value
}

func NewLease(client *clientv3.Client, purpose string) *lease {
return &lease{
Purpose: purpose,
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *lease) Grant(leaseTimeout int64) error {
if l == nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}

// GetMSMembers returns all the members of the specified service name.
func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch name {
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand All @@ -75,5 +75,5 @@ func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry,
return entries, nil
}

return nil, errors.Errorf("unknown service name %s", name)
return nil, errors.Errorf("unknown service name %s", serviceName)
}
49 changes: 48 additions & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
Expand All @@ -39,11 +40,14 @@ import (
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/unrolled/render"
"go.etcd.io/etcd/clientv3"
)

// APIPathPrefix is the prefix of the API path.
Expand Down Expand Up @@ -112,6 +116,7 @@ func NewService(srv *scheserver.Service) *Service {
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterMemberRouter()
s.RegisterConfigRouter()
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
Expand All @@ -130,7 +135,13 @@ func (s *Service) RegisterAdminRouter() {
router.DELETE("cache/regions/:id", deleteRegionCacheByID)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
// RegisterMemberRouter registers the router of the member handler.
func (s *Service) RegisterMemberRouter() {
router := s.root.Group("member")
router.POST("/primary/transfer", transferPrimary)
}

// RegisterSchedulersRouter registers the router of the schedulers' handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
Expand Down Expand Up @@ -259,6 +270,42 @@ func getConfig(c *gin.Context) {
c.IndentedJSON(http.StatusOK, cfg)
}

func transferPrimary(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
if svr.IsServing() {
c.AbortWithStatusJSON(http.StatusInternalServerError, "now is primary")
return
}

newLease := election.NewLease(svr.GetClient(), "primary election")
if err := newLease.Grant(mcsutils.DefaultLeaderLease); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "newLease grant error")
}

// remove primary lease
primaryKey := endpoint.SchedulingPrimaryPath(svr.GetClusterID())
deleteResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpDelete(primaryKey),
).Commit()
if err != nil || !deleteResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "delete resp error")
}

memberValue := svr.GetParticipant()
memberValue.GetLeadership().SetLease(newLease)
putResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpPut(primaryKey, memberValue.MemberValue(), clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID))),
).
Commit()
if err != nil || !putResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "put resp error")
}

c.IndentedJSON(http.StatusOK, "transfer submitted!")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
54 changes: 46 additions & 8 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) GetClusterID() uint64 {
return s.clusterID
}

func (s *Server) GetParticipant() *member.Participant {
return s.participant
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
Expand Down Expand Up @@ -249,18 +257,35 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
} else {
log.Error("campaign scheduling primary meets error due to etcd error",
zap.String("campaign-scheduling-primary-name", s.participant.Name()),
errs.ZapError(err))
leader, _, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("getting the leader meets error", errs.ZapError(err))
return
}
if leader != nil && !s.participant.IsSameLeader(leader) {
leader, ok := leader.(*schedulingpb.Participant)
if !ok {
log.Error("failed to get the leader", zap.Any("leader", leader))
return
}
log.Info("the scheduling primary/leader is already elected", zap.Stringer("scheduling-primary", leader))
return
}

if leader == nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
} else {
log.Error("campaign scheduling primary meets error due to etcd error",
zap.String("campaign-scheduling-primary-name", s.participant.Name()),
errs.ZapError(err))
}
return
}
}

// Start keepalive the leadership and enable Scheduling service.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
var resetLeaderOnce sync.Once
Expand Down Expand Up @@ -290,6 +315,19 @@ func (s *Server) campaignLeader() {
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

go func() {
log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader()))
_, revision, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}
// Watch will keep looping and never return unless the primary/leader has changed.
s.participant.GetLeadership().Watch(s.serverLoopCtx, revision)
s.participant.UnsetLeader()
log.Info("[primary] the scheduling primary has changed, try to re-campaign a primary")
}()

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
defer leaderTicker.Stop()

Expand Down
50 changes: 50 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/unrolled/render"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -94,6 +97,7 @@ func NewService(srv *tsoserver.Service) *Service {
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterMemberRouter()
s.RegisterKeyspaceGroupRouter()
s.RegisterHealthRouter()
s.RegisterConfigRouter()
Expand All @@ -107,6 +111,12 @@ func (s *Service) RegisterAdminRouter() {
router.PUT("/log", changeLogLevel)
}

// RegisterMemberRouter registers the router of the member handler.
func (s *Service) RegisterMemberRouter() {
router := s.root.Group("member")
router.POST("/primary/transfer", transferPrimary)
}

// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
func (s *Service) RegisterKeyspaceGroupRouter() {
router := s.root.Group("keyspace-groups")
Expand Down Expand Up @@ -141,6 +151,46 @@ func changeLogLevel(c *gin.Context) {
c.String(http.StatusOK, "The log level is updated.")
}

func transferPrimary(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
if svr.IsServing() {
c.AbortWithStatusJSON(http.StatusInternalServerError, "now is primary")
return
}

newLease := election.NewLease(svr.GetClient(), "transfer-primary")
if err := newLease.Grant(utils.DefaultLeaderLease); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "newLease grant error")
}

tsoRootPath := endpoint.TSOSvcRootPath(svr.GetClusterID())
primaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID)
// delete previous primary firstly
deleteResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpDelete(primaryKey),
).Commit()
if err != nil || !deleteResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "delete resp error")
}

memberValue, err := svr.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
memberValue.GetLeadership().SetLease(newLease)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "get tso member")
}
putResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpPut(primaryKey, memberValue.MemberValue(), clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID))),
).
Commit()
if err != nil || !putResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "put resp error")
}

c.IndentedJSON(http.StatusOK, "transfer submitted!")
}

// ResetTSParams is the input json body params of ResetTS
type ResetTSParams struct {
TSO string `json:"tso"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (s *Server) ServerLoopWgAdd(n int) {
s.serverLoopWg.Add(n)
}

func (s *Server) GetClusterID() uint64 {
return s.clusterID
}

// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
Expand Down
22 changes: 11 additions & 11 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) {
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) unsetLeader() {
// UnsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) UnsetLeader() {
m.leader.Store(&pdpb.Member{})
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -210,8 +210,8 @@ func (m *EmbeddedEtcdMember) PreCheckLeader() error {
return nil
}

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) {
// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *EmbeddedEtcdMember) GetPersistentLeader() (any, int64, error) {
leader := &pdpb.Member{}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
Expand All @@ -233,17 +233,17 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) {
return nil, true
}

leader, revision, err := m.getPersistentLeader()
leaderRaw, revision, err := m.GetPersistentLeader()
if err != nil {
log.Error("getting pd leader meets error", errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
return nil, true
}
if leader == nil {
if leaderRaw == nil {
// no leader yet
return nil, false
}

leader := leaderRaw.(*pdpb.Member)
if m.IsSameLeader(leader) {
// oh, we are already a PD leader, which indicates we may meet something wrong
// in previous CampaignLeader. We should delete the leadership and campaign again.
Expand All @@ -269,14 +269,14 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) {
func (m *EmbeddedEtcdMember) WatchLeader(ctx context.Context, leader *pdpb.Member, revision int64) {
m.setLeader(leader)
m.leadership.Watch(ctx, revision)
m.unsetLeader()
m.UnsetLeader()
}

// ResetLeader is used to reset the PD member's current leadership.
// Basically it will reset the leader lease and unset leader info.
func (m *EmbeddedEtcdMember) ResetLeader() {
m.leadership.Reset()
m.unsetLeader()
m.UnsetLeader()
}

// CheckPriority checks whether the etcd leader should be moved according to the priority.
Expand Down Expand Up @@ -324,8 +324,8 @@ func (m *EmbeddedEtcdMember) GetEtcdLeader() uint64 {
}

// IsSameLeader checks whether a server is the leader itself.
func (m *EmbeddedEtcdMember) IsSameLeader(leader *pdpb.Member) bool {
return leader.GetMemberId() == m.ID()
func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool {
return leader.(*pdpb.Member).GetMemberId() == m.ID()
}

// InitMemberInfo initializes the member info.
Expand Down
Loading

0 comments on commit 0ee6a64

Please sign in to comment.