From dbe75c02a4214cc2c4a02e8d1b846816a224f835 Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 14 Aug 2024 09:46:36 +0800 Subject: [PATCH] pdms: Choose a suitable pdms to transfer primary when upgrade (#5643) --- pkg/controller/pd_control.go | 14 ++++ pkg/manager/member/pd_ms_upgrader.go | 88 ++++++++++++++++++++++- pkg/manager/member/pd_ms_upgrader_test.go | 20 +++++- pkg/manager/member/utils.go | 15 ++++ pkg/pdapi/fake_pdapi.go | 48 +++++++++++++ pkg/pdapi/pd_control.go | 14 +++- pkg/pdapi/pdapi.go | 17 +++++ pkg/pdapi/pdms_api.go | 25 ++++++- 8 files changed, 234 insertions(+), 7 deletions(-) diff --git a/pkg/controller/pd_control.go b/pkg/controller/pd_control.go index 53418aa39b..b42f88bb0a 100644 --- a/pkg/controller/pd_control.go +++ b/pkg/controller/pd_control.go @@ -113,6 +113,20 @@ func NewFakePDClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) * return pdClient } +// NewFakePDMSClient creates a fake pdmsclient that is set as the pdms client +func NewFakePDMSClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster, curService string) *pdapi.FakePDMSClient { + pdmsClient := pdapi.NewFakePDMSClient() + if tc.Spec.Cluster != nil { + pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.Spec.Cluster.Namespace), tc.Spec.Cluster.Name, tc.Spec.Cluster.ClusterDomain, curService, pdmsClient) + } + if tc.Spec.ClusterDomain != "" { + pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.Spec.ClusterDomain, curService, pdmsClient) + } + pdControl.SetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), curService, pdmsClient) + + return pdmsClient +} + // NewFakePDClientWithAddress creates a fake pdclient that is set as the pd client func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string) *pdapi.FakePDClient { pdClient := pdapi.NewFakePDClient() diff --git a/pkg/manager/member/pd_ms_upgrader.go b/pkg/manager/member/pd_ms_upgrader.go index a00653f5de..3c99ad6499 100644 --- a/pkg/manager/member/pd_ms_upgrader.go +++ b/pkg/manager/member/pd_ms_upgrader.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/controller" mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils" "github.com/pingcap/tidb-operator/pkg/third_party/k8s" + "github.com/pingcap/tidb-operator/pkg/util/cmpver" apps "k8s.io/api/apps/v1" "k8s.io/klog/v2" ) @@ -120,12 +121,95 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St } continue } - mngerutils.SetUpgradePartition(newSet, i) - return nil + + return u.upgradePDMSPod(tc, i, newSet, curService) } return nil } +func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet, curService string) error { + // Only support after `8.3.0` to keep compatibility. + if check, err := pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion(curService)); check && err == nil { + ns := tc.GetNamespace() + tcName := tc.GetName() + upgradePDMSName := PDMSName(tcName, ordinal, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s, curService) + upgradePodName := PDMSPodName(tcName, ordinal, curService) + + pdClient := controller.GetPDClient(u.deps.PDControl, tc) + primary, err := pdClient.GetMSPrimary(curService) + if err != nil { + return err + } + + klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName, + primary, upgradePDMSName, upgradePodName) + // If current pdms is primary, transfer primary to other pdms pod + if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) { + targetName := "" + + if tc.PDMSStsActualReplicas(curService) > 1 { + targetName = choosePDMSToTransferFromMembers(tc, newSet, ordinal) + } + + if targetName != "" { + klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName) + err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName) + if err != nil { + klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err) + return err + } + klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName) + } else { + klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName) + } + } + } + + mngerutils.SetUpgradePartition(newSet, ordinal) + return nil +} + +// choosePDMSToTransferFromMembers choose a pdms to transfer primary from members +// +// Assume that current primary ordinal is x, and range is [0, n] +// 1. Find the max suitable ordinal in (x, n], because they have been upgraded +// 2. If no suitable ordinal, find the min suitable ordinal in [0, x) to reduce the count of transfer +func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string { + ns := tc.GetNamespace() + tcName := tc.GetName() + klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName) + ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet) + + // set ordinal to max ordinal if ordinal isn't exist + if !ordinals.Has(ordinal) { + ordinal = helper.GetMaxPodOrdinal(*newSet.Spec.Replicas, newSet) + } + + targetName := "" + list := ordinals.List() + if len(list) == 0 { + return "" + } + + // just using pods index for now. TODO: add healthy checker for pdms. + // find the maximum ordinal which is larger than ordinal + if len(list) > int(ordinal)+1 { + targetName = PDMSPodName(tcName, list[len(list)-1], controller.PDMSTrimName(newSet.Name)) + } + + if targetName == "" && ordinal != 0 { + // find the minimum ordinal which is less than ordinal + targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name)) + } + + klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName) + return targetName +} + +// PDMSSupportMicroServicesWithName returns true if the given version of PDMS supports microservices with name. +// related https://github.com/tikv/pd/pull/8157. +var pdMSSupportMicroServicesWithName, _ = cmpver.NewConstraint(cmpver.GreaterOrEqual, "v8.3.0") + type fakePDMSUpgrader struct{} // NewFakePDMSUpgrader returns a fakePDUpgrader diff --git a/pkg/manager/member/pd_ms_upgrader_test.go b/pkg/manager/member/pd_ms_upgrader_test.go index 19152d69b3..b6b0f7adfb 100644 --- a/pkg/manager/member/pd_ms_upgrader_test.go +++ b/pkg/manager/member/pd_ms_upgrader_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils" + "github.com/pingcap/tidb-operator/pkg/pdapi" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,8 +45,20 @@ func TestPDMSUpgraderUpgrade(t *testing.T) { testFn := func(test *testcase) { t.Log(test.name) - upgrader, podInformer := newPDMSUpgrader() + upgrader, pdControl, podInformer := newPDMSUpgrader() tc := newTidbClusterForPDMSUpgrader() + pdClient := controller.NewFakePDClient(pdControl, tc) + pdMSClient := controller.NewFakePDMSClient(pdControl, tc, "tso") + + pdClient.AddReaction(pdapi.GetPDMSPrimaryActionType, func(action *pdapi.Action) (interface{}, error) { + return "upgrader-tso-1", nil + }) + pdMSClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) { + return nil, nil + }) + pdMSClient.AddReaction(pdapi.PDMSTransferPrimaryActionType, func(action *pdapi.Action) (interface{}, error) { + return nil, nil + }) if test.changeFn != nil { test.changeFn(tc) @@ -218,11 +231,12 @@ func TestPDMSUpgraderUpgrade(t *testing.T) { } } -func newPDMSUpgrader() (Upgrader, podinformers.PodInformer) { +func newPDMSUpgrader() (Upgrader, *pdapi.FakePDControl, podinformers.PodInformer) { fakeDeps := controller.NewFakeDependencies() pdMSUpgrader := &pdMSUpgrader{deps: fakeDeps} podInformer := fakeDeps.KubeInformerFactory.Core().V1().Pods() - return pdMSUpgrader, podInformer + pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl) + return pdMSUpgrader, pdControl, podInformer } func newStatefulSetForPDMSUpgrader() *apps.StatefulSet { diff --git a/pkg/manager/member/utils.go b/pkg/manager/member/utils.go index 898ee77ec2..c7cda0ba53 100644 --- a/pkg/manager/member/utils.go +++ b/pkg/manager/member/utils.go @@ -160,6 +160,21 @@ func PdName(tcName string, ordinal int32, namespace string, clusterDomain string return PdPodName(tcName, ordinal) } +// PDMSName should match the start arg `--name` of pd-server +// See the start script of PDMS in pkg/manager/member/startscript/v2.renderPDMSStartScript +func PDMSName(tcName string, ordinal int32, namespace, clusterDomain string, acrossK8s bool, component string) string { + if len(clusterDomain) > 0 { + return fmt.Sprintf("%s.%s-%s-peer.%s.svc.%s", PDMSPodName(tcName, ordinal, component), component, tcName, namespace, clusterDomain) + } + + // clusterDomain is not set + if acrossK8s { + return fmt.Sprintf("%s.%s-%s-peer.%s.svc", PDMSPodName(tcName, ordinal, component), component, tcName, namespace) + } + + return PDMSPodName(tcName, ordinal, component) +} + // NeedForceUpgrade check if force upgrade is necessary func NeedForceUpgrade(ann map[string]string) bool { // Check if annotation 'pingcap.com/force-upgrade: "true"' is set diff --git a/pkg/pdapi/fake_pdapi.go b/pkg/pdapi/fake_pdapi.go index e451b910ed..80138fe4a5 100644 --- a/pkg/pdapi/fake_pdapi.go +++ b/pkg/pdapi/fake_pdapi.go @@ -28,6 +28,7 @@ const ( GetClusterActionType ActionType = "GetCluster" GetMembersActionType ActionType = "GetMembers" GetPDMSMembersActionType ActionType = "GetPDMSMembers" + GetPDMSPrimaryActionType ActionType = "GetPDMSPrimary" GetStoresActionType ActionType = "GetStores" GetTombStoneStoresActionType ActionType = "GetTombStoneStores" GetStoreActionType ActionType = "GetStore" @@ -45,6 +46,7 @@ const ( TransferPDLeaderActionType ActionType = "TransferPDLeader" GetAutoscalingPlansActionType ActionType = "GetAutoscalingPlans" GetRecoveringMarkActionType ActionType = "GetRecoveringMark" + PDMSTransferPrimaryActionType ActionType = "PDMSTransferPrimary" ) type NotFoundReaction struct { @@ -78,6 +80,15 @@ func (c *FakePDClient) GetMSMembers(_ string) ([]string, error) { return result.([]string), nil } +func (c *FakePDClient) GetMSPrimary(_ string) (string, error) { + action := &Action{} + result, err := c.fakeAPI(GetPDMSPrimaryActionType, action) + if err != nil { + return "", err + } + return result.(string), nil +} + func NewFakePDClient() *FakePDClient { return &FakePDClient{reactions: map[ActionType]Reaction{}} } @@ -291,3 +302,40 @@ func (c *FakePDClient) GetRecoveringMark() (bool, error) { return true, nil } + +// FakePDMSClient implements a fake version of PDMSClient. +type FakePDMSClient struct { + reactions map[ActionType]Reaction +} + +func NewFakePDMSClient() *FakePDMSClient { + return &FakePDMSClient{reactions: map[ActionType]Reaction{}} +} + +func (c *FakePDMSClient) AddReaction(actionType ActionType, reaction Reaction) { + c.reactions[actionType] = reaction +} + +// fakeAPI is a small helper for fake API calls +func (c *FakePDMSClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) { + if reaction, ok := c.reactions[actionType]; ok { + result, err := reaction(action) + if err != nil { + return nil, err + } + return result, nil + } + return nil, &NotFoundReaction{actionType} +} + +func (c *FakePDMSClient) GetHealth() error { + action := &Action{} + _, err := c.fakeAPI(GetHealthActionType, action) + return err +} + +func (c *FakePDMSClient) TransferPrimary(newPrimary string) error { + action := &Action{Name: newPrimary} + _, err := c.fakeAPI(PDMSTransferPrimaryActionType, action) + return err +} diff --git a/pkg/pdapi/pd_control.go b/pkg/pdapi/pd_control.go index 667300b8e8..811f7d6c01 100644 --- a/pkg/pdapi/pd_control.go +++ b/pkg/pdapi/pd_control.go @@ -337,7 +337,7 @@ type FakePDControl struct { func NewFakePDControl(secretLister corelisterv1.SecretLister) *FakePDControl { return &FakePDControl{ - defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}}, + defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, pdMSClients: map[string]PDMSClient{}}, } } @@ -352,3 +352,15 @@ func (fpc *FakePDControl) SetPDClientWithClusterDomain(namespace Namespace, tcNa func (fpc *FakePDControl) SetPDClientWithAddress(peerURL string, pdclient PDClient) { fpc.defaultPDControl.pdClients[peerURL] = pdclient } + +func (fpc *FakePDControl) SetPDMSClient(namespace Namespace, tcName, curService string, pdmsclient PDMSClient) { + fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", "", curService, false)] = pdmsclient +} + +func (fpc *FakePDControl) SetPDMSClientWithClusterDomain(namespace Namespace, tcName, tcClusterDomain, curService string, pdmsclient PDMSClient) { + fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", tcClusterDomain, curService, false)] = pdmsclient +} + +func (fpc *FakePDControl) SetPDMSClientWithAddress(peerURL string, pdmsclient PDMSClient) { + fpc.defaultPDControl.pdMSClients[peerURL] = pdmsclient +} diff --git a/pkg/pdapi/pdapi.go b/pkg/pdapi/pdapi.go index 5f2fe17f38..4a633d6906 100644 --- a/pkg/pdapi/pdapi.go +++ b/pkg/pdapi/pdapi.go @@ -96,6 +96,8 @@ type PDClient interface { GetRecoveringMark() (bool, error) // GetMSMembers returns all PDMS members service-addr from cluster by specific Micro Service GetMSMembers(service string) ([]string, error) + // GetMSPrimary returns the primary PDMS member service-addr from cluster by specific Micro Service + GetMSPrimary(service string) (string, error) } var ( @@ -341,6 +343,21 @@ func (c *pdClient) GetMSMembers(service string) ([]string, error) { return addrs, nil } +func (c *pdClient) GetMSPrimary(service string) (string, error) { + apiURL := fmt.Sprintf("%s/%s/primary/%s", c.url, MicroServicePrefix, service) + body, err := httputil.GetBodyOK(c.httpClient, apiURL) + if err != nil { + return "", err + } + var primary string + err = json.Unmarshal(body, &primary) + if err != nil { + return "", err + } + + return primary, nil +} + func (c *pdClient) getStores(apiURL string) (*StoresInfo, error) { body, err := httputil.GetBodyOK(c.httpClient, apiURL) if err != nil { diff --git a/pkg/pdapi/pdms_api.go b/pkg/pdapi/pdms_api.go index 1a948e0692..b8c8c77482 100644 --- a/pkg/pdapi/pdms_api.go +++ b/pkg/pdapi/pdms_api.go @@ -14,7 +14,9 @@ package pdapi import ( + "bytes" "crypto/tls" + "encoding/json" "fmt" "net/http" "time" @@ -27,10 +29,13 @@ import ( type PDMSClient interface { // GetHealth returns ping result GetHealth() error + // TransferPrimary transfers the primary to the newPrimary + TransferPrimary(newPrimary string) error } var ( - pdMSHealthPrefix = "api/v1/health" + pdMSHealthPrefix = "api/v1/health" + pdMSPrimaryTransferPrefix = "api/v1/primary/transfer" ) // pdMSClient is default implementation of PDClient @@ -69,3 +74,21 @@ func (c *pdMSClient) GetHealth() error { } return nil } + +func (c *pdMSClient) TransferPrimary(newPrimary string) error { + apiURL := fmt.Sprintf("%s/%s/%s", c.url, c.serviceName, pdMSPrimaryTransferPrefix) + data, err := json.Marshal(struct { + NewPrimary string `json:"new_primary"` + }{ + NewPrimary: newPrimary, + }) + if err != nil { + return err + } + _, err = httputil.PostBodyOK(c.httpClient, apiURL, bytes.NewBuffer(data)) + if err != nil { + return err + } + + return nil +}