From 7fa19d3572b83505cf931fd3be7755ef513c51a4 Mon Sep 17 00:00:00 2001 From: husharp Date: Thu, 9 May 2024 11:03:16 +0800 Subject: [PATCH] check primary Signed-off-by: husharp --- pkg/election/leadership.go | 17 +++++++ pkg/mcs/discovery/discover.go | 62 ++++++++++++++++++++++++-- pkg/mcs/discovery/registry_entry.go | 2 + pkg/mcs/scheduling/server/server.go | 41 +++++++++++++++++ pkg/mcs/tso/server/server.go | 4 ++ pkg/mcs/utils/util.go | 49 ++++++++++++++++++++ pkg/member/member.go | 22 ++++----- pkg/member/participant.go | 18 ++++---- pkg/tso/allocator_manager.go | 4 ++ pkg/tso/global_allocator.go | 39 +++++++++++++++- server/apiv2/handlers/micro_service.go | 43 ++++++++++++++++++ 11 files changed, 277 insertions(+), 24 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 02f519dbc75..755a7d6d331 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -64,6 +64,8 @@ type Leadership struct { leaderKey string leaderValue string + LeaderWatch bool + keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc keepAliveCancelFuncLock syncutil.Mutex @@ -72,6 +74,14 @@ type Leadership struct { campaignTimes []time.Time } +func (ls *Leadership) SetLeaderWatch(val bool) { + ls.LeaderWatch = val +} + +func (ls *Leadership) GetLeaderValue() string { + return ls.leaderValue +} + // NewLeadership creates a new Leadership. func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership { leadership := &Leadership{ @@ -375,6 +385,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } + // only API update the leader key to transfer the leader will meet + if ev.Type == mvccpb.PUT && ls.LeaderWatch { + log.Info("[LeaderWatch] current leadership is updated", zap.Int64("watchRevision", revision), + zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) + return + } } revision = wresp.Header.Revision + 1 } @@ -393,4 +409,5 @@ func (ls *Leadership) Reset() { } ls.keepAliveCancelFuncLock.Unlock() ls.getLease().Close() + ls.LeaderWatch = false } diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 1ce5ecda51d..55d5732ec34 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -15,12 +15,15 @@ package discovery import ( + "math/rand" "strconv" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" @@ -45,14 +48,14 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } // GetMSMembers returns all the members of the specified service name. -func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) { - switch name { +func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) { + switch serviceName { case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName: clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) if err != nil { return nil, err } - servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name) + servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName) resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() if err != nil { return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() @@ -75,5 +78,56 @@ func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, return entries, nil } - return nil, errors.Errorf("unknown service name %s", name) + return nil, errors.Errorf("unknown service name %s", serviceName) +} + +func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string) error { + log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary)) + entries, err := GetMSMembers(serviceName, client) + if err != nil { + return err + } + + // Do nothing when I am the only member of cluster. + if len(entries) == 1 && newPrimary == "" { + return errors.New("no valid follower to transfer primary") + } + + var primaryIDs []string + var memberValues []string + for _, member := range entries { + if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.ServiceAddr == newPrimary) { + primaryIDs = append(primaryIDs, member.ServiceAddr) + memberValues = append(memberValues, string(member.MemberValue)) + } + } + if len(primaryIDs) == 0 { + return errors.New("no valid follower to transfer primary") + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + nextPrimaryID := r.Intn(len(primaryIDs)) + + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return errors.Errorf("failed to get cluster ID: %v", err) + } + + var primaryKey string + switch serviceName { + case utils.SchedulingServiceName: + primaryKey = endpoint.SchedulingPrimaryPath(clusterID) + case utils.TSOServiceName: + tsoRootPath := endpoint.TSOSvcRootPath(clusterID) + primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID) + } + + // update primary key to notify old primary server. + putResp, err := kv.NewSlowLogTxn(client). + Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID])). + Commit() + if err != nil || !putResp.Succeeded { + return errors.Errorf("failed to write primary flag for %s", serviceName) + } + return nil } diff --git a/pkg/mcs/discovery/registry_entry.go b/pkg/mcs/discovery/registry_entry.go index bf11ae5c8a4..ede9f12172e 100644 --- a/pkg/mcs/discovery/registry_entry.go +++ b/pkg/mcs/discovery/registry_entry.go @@ -23,11 +23,13 @@ import ( // ServiceRegistryEntry is the registry entry of a service type ServiceRegistryEntry struct { + Name string `json:"name"` ServiceAddr string `json:"service-addr"` Version string `json:"version"` GitHash string `json:"git-hash"` DeployPath string `json:"deploy-path"` StartTimestamp int64 `json:"start-timestamp"` + MemberValue []byte `json:"member-value"` } // Serialize this service registry entry diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 47a7cf9962b..adb72fd52c8 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -128,6 +128,10 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } +func (s *Server) GetParticipant() *member.Participant { + return s.participant +} + // SetLogLevel sets log level. func (s *Server) SetLogLevel(level string) error { if !logutil.IsLevelLegal(level) { @@ -243,6 +247,17 @@ func (s *Server) primaryElectionLoop() { log.Info("the scheduling primary has changed, try to re-campaign a primary") } + // To make sure the expected leader(if exist) and primary are on the same server. + expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient()) + if expectedPrimary != "" && expectedPrimary != s.participant.GetLeadership().GetLeaderValue() { + log.Info("skip campaigning of scheduling primary and check later", + zap.String("server-name", s.Name()), + zap.String("target-primary-id", expectedPrimary), + zap.Uint64("member-id", s.participant.ID())) + time.Sleep(200 * time.Millisecond) + continue + } + s.campaignLeader() } } @@ -290,6 +305,9 @@ func (s *Server) campaignLeader() { member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1) log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) + exitPrimary := make(chan struct{}) + go s.primaryWatch(exitPrimary) + leaderTicker := time.NewTicker(utils.LeaderTickInterval) defer leaderTicker.Stop() @@ -304,10 +322,31 @@ func (s *Server) campaignLeader() { // Server is closed and it should return nil. log.Info("server is closed") return + case <-exitPrimary: + log.Info("no longer a primary/leader because primary have been updated, the scheduling primary/leader will step down") + return } } } +func (s *Server) primaryWatch(exitPrimary chan struct{}) { + _, revision, err := s.participant.GetPersistentLeader() + if err != nil { + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) + return + } + log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader())) + // Watch will keep looping and never return unless the primary has changed. + s.participant.GetLeadership().SetLeaderWatch(true) + s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1) + s.participant.GetLeadership().SetLeaderWatch(false) + + utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath()) + + s.participant.UnsetLeader() + exitPrimary <- struct{}{} +} + // Close closes the server. func (s *Server) Close() { if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { @@ -425,6 +464,7 @@ func (s *Server) startServer() (err error) { GitHash: versioninfo.PDGitHash, DeployPath: deployPath, StartTimestamp: s.StartTimestamp(), + Name: s.Name(), } uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) @@ -436,6 +476,7 @@ func (s *Server) startServer() (err error) { ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election") + s.serviceID.MemberValue = []byte(s.participant.MemberValue()) s.service = &Service{Server: s} s.AddServiceReadyCallback(s.startCluster) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c38c7142730..17f7312a8b7 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -380,6 +380,7 @@ func (s *Server) startServer() (err error) { GitHash: versioninfo.PDGitHash, DeployPath: deployPath, StartTimestamp: s.StartTimestamp(), + Name: s.Name(), } s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr, @@ -387,6 +388,9 @@ func (s *Server) startServer() (err error) { if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } + // Initialize the service ID with the member value of the primary of the default keyspace group. + memberValue, err := s.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + s.serviceID.MemberValue = []byte(memberValue.MemberValue()) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.service = &Service{Server: s} diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index b6ac2eb37e5..7bbebef5693 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -16,6 +16,7 @@ package utils import ( "context" + "github.com/tikv/pd/pkg/storage/kv" "net" "net/http" "os" @@ -51,6 +52,9 @@ const ( ClusterIDPath = "/pd/cluster_id" // retryInterval is the interval to retry. retryInterval = time.Second + // ExpectedPrimary is the path to store the expected primary + // ONLY SET VALUE BY API + ExpectedPrimary = "expected_primary" ) // InitClusterID initializes the cluster ID. @@ -70,6 +74,51 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes) } +// GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API. +func GetExpectedPrimary(keyPath string, client *clientv3.Client) string { + leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/")) + if err != nil { + log.Error("get expected primary key error", errs.ZapError(err)) + return "" + } + + return string(leader) +} + +// RemoveExpectedPrimary removes the expected primary key. +func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { + // remove expected leader key + resp, err := kv.NewSlowLogTxn(client). + Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))). + Commit() + if err != nil && !resp.Succeeded { + log.Error("change primary error", errs.ZapError(err)) + return + } +} + +// SetExpectedPrimary sets the expected primary key when the current primary has exited. +func SetExpectedPrimary(client *clientv3.Client, leaderPath string) { + // write a flag to indicate the current primary has exited + leaderRaw, err := etcdutil.GetValue(client, leaderPath) + if err != nil { + log.Error("[primary] get primary key error", zap.Error(err)) + return + } + + // write a flag to indicate the current primary has exited + resp, err := kv.NewSlowLogTxn(client). + Then( + clientv3.OpPut(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"), string(leaderRaw)), + // indicate the current primary has exited + clientv3.OpDelete(leaderPath)). + Commit() + if err != nil && !resp.Succeeded { + log.Error("change primary error", errs.ZapError(err)) + return + } +} + // PromHandler is a handler to get prometheus metrics. func PromHandler() gin.HandlerFunc { return func(c *gin.Context) { diff --git a/pkg/member/member.go b/pkg/member/member.go index af504d83963..4522eb7ae33 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -148,8 +148,8 @@ func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) { m.lastLeaderUpdatedTime.Store(time.Now()) } -// unsetLeader unsets the member's PD leader. -func (m *EmbeddedEtcdMember) unsetLeader() { +// UnsetLeader unsets the member's PD leader. +func (m *EmbeddedEtcdMember) UnsetLeader() { m.leader.Store(&pdpb.Member{}) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -210,8 +210,8 @@ func (m *EmbeddedEtcdMember) PreCheckLeader() error { return nil } -// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) { +// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *EmbeddedEtcdMember) GetPersistentLeader() (any, int64, error) { leader := &pdpb.Member{} ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -233,17 +233,17 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { return nil, true } - leader, revision, err := m.getPersistentLeader() + leaderRaw, revision, err := m.GetPersistentLeader() if err != nil { log.Error("getting pd leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true } - if leader == nil { + if leaderRaw == nil { // no leader yet return nil, false } - + leader := leaderRaw.(*pdpb.Member) if m.IsSameLeader(leader) { // oh, we are already a PD leader, which indicates we may meet something wrong // in previous CampaignLeader. We should delete the leadership and campaign again. @@ -269,14 +269,14 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { func (m *EmbeddedEtcdMember) WatchLeader(ctx context.Context, leader *pdpb.Member, revision int64) { m.setLeader(leader) m.leadership.Watch(ctx, revision) - m.unsetLeader() + m.UnsetLeader() } // ResetLeader is used to reset the PD member's current leadership. // Basically it will reset the leader lease and unset leader info. func (m *EmbeddedEtcdMember) ResetLeader() { m.leadership.Reset() - m.unsetLeader() + m.UnsetLeader() } // CheckPriority checks whether the etcd leader should be moved according to the priority. @@ -324,8 +324,8 @@ func (m *EmbeddedEtcdMember) GetEtcdLeader() uint64 { } // IsSameLeader checks whether a server is the leader itself. -func (m *EmbeddedEtcdMember) IsSameLeader(leader *pdpb.Member) bool { - return leader.GetMemberId() == m.ID() +func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool { + return leader.(*pdpb.Member).GetMemberId() == m.ID() } // InitMemberInfo initializes the member info. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 8a0ffadd31e..f74d17aee22 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -154,8 +154,8 @@ func (m *Participant) setLeader(member participant) { m.lastLeaderUpdatedTime.Store(time.Now()) } -// unsetLeader unsets the member's leader. -func (m *Participant) unsetLeader() { +// UnsetLeader unsets the member's leader. +func (m *Participant) UnsetLeader() { leader := NewParticipantByService(m.serviceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) @@ -164,6 +164,7 @@ func (m *Participant) unsetLeader() { // EnableLeader declares the member itself to be the leader. func (m *Participant) EnableLeader() { m.setLeader(m.member) + utils.RemoveExpectedPrimary(m.client, m.GetLeaderPath()) } // GetLeaderPath returns the path of the leader. @@ -205,8 +206,8 @@ func (*Participant) PreCheckLeader() error { return nil } -// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *Participant) getPersistentLeader() (participant, int64, error) { +// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *Participant) GetPersistentLeader() (any, int64, error) { leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -228,17 +229,18 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { return nil, true } - leader, revision, err := m.getPersistentLeader() + leaderRaw, revision, err := m.GetPersistentLeader() if err != nil { log.Error("getting the leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true } - if leader == nil { + if leaderRaw == nil { // no leader yet return nil, false } + leader := leaderRaw.(participant) if m.IsSameLeader(leader) { // oh, we are already the leader, which indicates we may meet something wrong // in previous CampaignLeader. We should delete the leadership and campaign again. @@ -264,14 +266,14 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { func (m *Participant) WatchLeader(ctx context.Context, leader participant, revision int64) { m.setLeader(leader) m.leadership.Watch(ctx, revision) - m.unsetLeader() + m.UnsetLeader() } // ResetLeader is used to reset the member's current leadership. // Basically it will reset the leader lease and unset leader info. func (m *Participant) ResetLeader() { m.leadership.Reset() - m.unsetLeader() + m.UnsetLeader() } // IsSameLeader checks whether a server is the leader itself. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index f1683de1352..e3a0a8302f3 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -145,6 +145,10 @@ type ElectionMember interface { GetDCLocationPath(id uint64) string // PreCheckLeader does some pre-check before checking whether it's the leader. PreCheckLeader() error + // GetPersistentLeader returns the persistent leader. + GetPersistentLeader() (any, int64, error) + // UnsetLeader unsets the member's leader. + UnsetLeader() } // AllocatorManager is used to manage the TSO Allocators a PD server holds. diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index f90dc5f26fe..1ca854dfed9 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -560,6 +560,17 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) } + // To make sure the expected leader(if exist) and primary are on the same server. + targetPrimary := mcsutils.GetExpectedPrimary(gta.member.GetLeaderPath(), gta.member.Client()) + if targetPrimary != "" && targetPrimary != gta.member.GetLeadership().GetLeaderValue() { + log.Info("skip campaigning of scheduling primary and check later", + zap.String("server-name", gta.member.Name()), + zap.String("target-primary-id", targetPrimary), + zap.Uint64("member-id", gta.member.ID())) + time.Sleep(200 * time.Millisecond) + continue + } + gta.campaignLeader() } } @@ -596,7 +607,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { gta.member.ResetLeader() }) - // maintain the the leadership, after this, TSO can be service. + // maintain the leadership, after this, TSO can be service. gta.member.KeepLeader(ctx) log.Info("campaign tso primary ok", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), @@ -635,6 +646,9 @@ func (gta *GlobalTSOAllocator) campaignLeader() { logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("tso-primary-name", gta.member.Name())) + exitPrimary := make(chan struct{}) + go gta.primaryWatch(exitPrimary) + leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval) defer leaderTicker.Stop() @@ -651,10 +665,33 @@ func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("exit leader campaign", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return + case <-exitPrimary: + log.Info("no longer a primary because primary have been updated, the TSO primary/leader will step down") + return } } } +func (gta *GlobalTSOAllocator) primaryWatch(exitPrimary chan struct{}) { + _, revision, err := gta.member.GetPersistentLeader() + if err != nil { + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) + return + } + log.Info("[primary] start to watch the primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) + // Watch will keep looping and never return unless the primary has changed. + gta.member.GetLeadership().SetLeaderWatch(true) + gta.member.GetLeadership().Watch(gta.ctx, revision+1) + gta.member.GetLeadership().SetLeaderWatch(false) + + mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) + + gta.member.UnsetLeader() + exitPrimary <- struct{}{} +} + func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics { return gta.timestampOracle.metrics } diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index fd44665530f..07fa53224d2 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -28,6 +28,7 @@ func RegisterMicroService(r *gin.RouterGroup) { router := r.Group("ms") router.GET("members/:service", GetMembers) router.GET("primary/:service", GetPrimary) + router.POST("primary/transfer/:service", TransferPrimary) } // GetMembers gets all members of the cluster for the specified service. @@ -77,3 +78,45 @@ func GetPrimary(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, "please specify service") } + +// TransferPrimary transfers the primary member of the specified service. +// @Tags primary +// @Summary Transfer the primary member of the specified service. +// @Produce json +// @Param service path string true "service name" +// @Param new_primary query string false "new primary address" +// @Success 200 {object} string +// @Router /ms/primary/transfer/{service} [post] +func TransferPrimary(c *gin.Context) { + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + if !svr.IsAPIServiceMode() { + c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") + return + } + + if service := c.Param("service"); len(service) > 0 { + var input map[string]string + if err := c.Bind(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + newPrimary := "" + if v, ok := input["new_primary"]; ok { + newPrimary = v + } + oldPrimary, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service) + if oldPrimary == newPrimary { + c.AbortWithStatusJSON(http.StatusInternalServerError, "new primary is the same as the old one") + return + } + if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, "success") + return + } + + c.AbortWithStatusJSON(http.StatusInternalServerError, "please specify service") +}