Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdms: Choose a suitable pdms to transfer primary when upgrade (#5643) #5709

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ func NewFakePDClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *
return pdClient
}

// NewFakePDMSClient creates a fake pdmsclient that is set as the pdms client
func NewFakePDMSClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster, curService string) *pdapi.FakePDMSClient {
pdmsClient := pdapi.NewFakePDMSClient()
if tc.Spec.Cluster != nil {
pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.Spec.Cluster.Namespace), tc.Spec.Cluster.Name, tc.Spec.Cluster.ClusterDomain, curService, pdmsClient)
}
if tc.Spec.ClusterDomain != "" {
pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.Spec.ClusterDomain, curService, pdmsClient)
}
pdControl.SetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), curService, pdmsClient)

return pdmsClient
}

// NewFakePDClientWithAddress creates a fake pdclient that is set as the pd client
func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string) *pdapi.FakePDClient {
pdClient := pdapi.NewFakePDClient()
Expand Down
88 changes: 86 additions & 2 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
"github.com/pingcap/tidb-operator/pkg/util/cmpver"
apps "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -120,12 +121,95 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
}
continue
}
mngerutils.SetUpgradePartition(newSet, i)
return nil

return u.upgradePDMSPod(tc, i, newSet, curService)
}
return nil
}

func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet, curService string) error {
// Only support after `8.3.0` to keep compatibility.
if check, err := pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion(curService)); check && err == nil {
ns := tc.GetNamespace()
tcName := tc.GetName()
upgradePDMSName := PDMSName(tcName, ordinal, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s, curService)
upgradePodName := PDMSPodName(tcName, ordinal, curService)

pdClient := controller.GetPDClient(u.deps.PDControl, tc)
primary, err := pdClient.GetMSPrimary(curService)
if err != nil {
return err
}

klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
primary, upgradePDMSName, upgradePodName)
// If current pdms is primary, transfer primary to other pdms pod
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
targetName := ""

if tc.PDMSStsActualReplicas(curService) > 1 {
targetName = choosePDMSToTransferFromMembers(tc, newSet, ordinal)
}

if targetName != "" {
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
if err != nil {
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
} else {
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
}
}
}

mngerutils.SetUpgradePartition(newSet, ordinal)
return nil
}

// choosePDMSToTransferFromMembers choose a pdms to transfer primary from members
//
// Assume that current primary ordinal is x, and range is [0, n]
// 1. Find the max suitable ordinal in (x, n], because they have been upgraded
// 2. If no suitable ordinal, find the min suitable ordinal in [0, x) to reduce the count of transfer
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
ns := tc.GetNamespace()
tcName := tc.GetName()
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)

// set ordinal to max ordinal if ordinal isn't exist
if !ordinals.Has(ordinal) {
ordinal = helper.GetMaxPodOrdinal(*newSet.Spec.Replicas, newSet)
}

targetName := ""
list := ordinals.List()
if len(list) == 0 {
return ""
}

// just using pods index for now. TODO: add healthy checker for pdms.
// find the maximum ordinal which is larger than ordinal
if len(list) > int(ordinal)+1 {
targetName = PDMSPodName(tcName, list[len(list)-1], controller.PDMSTrimName(newSet.Name))
}

if targetName == "" && ordinal != 0 {
// find the minimum ordinal which is less than ordinal
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
}

klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
return targetName
}

// PDMSSupportMicroServicesWithName returns true if the given version of PDMS supports microservices with name.
// related https://github.com/tikv/pd/pull/8157.
var pdMSSupportMicroServicesWithName, _ = cmpver.NewConstraint(cmpver.GreaterOrEqual, "v8.3.0")

type fakePDMSUpgrader struct{}

// NewFakePDMSUpgrader returns a fakePDUpgrader
Expand Down
20 changes: 17 additions & 3 deletions pkg/manager/member/pd_ms_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -44,8 +45,20 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {

testFn := func(test *testcase) {
t.Log(test.name)
upgrader, podInformer := newPDMSUpgrader()
upgrader, pdControl, podInformer := newPDMSUpgrader()
tc := newTidbClusterForPDMSUpgrader()
pdClient := controller.NewFakePDClient(pdControl, tc)
pdMSClient := controller.NewFakePDMSClient(pdControl, tc, "tso")

pdClient.AddReaction(pdapi.GetPDMSPrimaryActionType, func(action *pdapi.Action) (interface{}, error) {
return "upgrader-tso-1", nil
})
pdMSClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) {
return nil, nil
})
pdMSClient.AddReaction(pdapi.PDMSTransferPrimaryActionType, func(action *pdapi.Action) (interface{}, error) {
return nil, nil
})

if test.changeFn != nil {
test.changeFn(tc)
Expand Down Expand Up @@ -218,11 +231,12 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {
}
}

func newPDMSUpgrader() (Upgrader, podinformers.PodInformer) {
func newPDMSUpgrader() (Upgrader, *pdapi.FakePDControl, podinformers.PodInformer) {
fakeDeps := controller.NewFakeDependencies()
pdMSUpgrader := &pdMSUpgrader{deps: fakeDeps}
podInformer := fakeDeps.KubeInformerFactory.Core().V1().Pods()
return pdMSUpgrader, podInformer
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)
return pdMSUpgrader, pdControl, podInformer
}

func newStatefulSetForPDMSUpgrader() *apps.StatefulSet {
Expand Down
15 changes: 15 additions & 0 deletions pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ func PdName(tcName string, ordinal int32, namespace string, clusterDomain string
return PdPodName(tcName, ordinal)
}

// PDMSName should match the start arg `--name` of pd-server
// See the start script of PDMS in pkg/manager/member/startscript/v2.renderPDMSStartScript
func PDMSName(tcName string, ordinal int32, namespace, clusterDomain string, acrossK8s bool, component string) string {
if len(clusterDomain) > 0 {
return fmt.Sprintf("%s.%s-%s-peer.%s.svc.%s", PDMSPodName(tcName, ordinal, component), component, tcName, namespace, clusterDomain)
}

// clusterDomain is not set
if acrossK8s {
return fmt.Sprintf("%s.%s-%s-peer.%s.svc", PDMSPodName(tcName, ordinal, component), component, tcName, namespace)
}

return PDMSPodName(tcName, ordinal, component)
}

// NeedForceUpgrade check if force upgrade is necessary
func NeedForceUpgrade(ann map[string]string) bool {
// Check if annotation 'pingcap.com/force-upgrade: "true"' is set
Expand Down
48 changes: 48 additions & 0 deletions pkg/pdapi/fake_pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
GetClusterActionType ActionType = "GetCluster"
GetMembersActionType ActionType = "GetMembers"
GetPDMSMembersActionType ActionType = "GetPDMSMembers"
GetPDMSPrimaryActionType ActionType = "GetPDMSPrimary"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
GetStoreActionType ActionType = "GetStore"
Expand All @@ -45,6 +46,7 @@ const (
TransferPDLeaderActionType ActionType = "TransferPDLeader"
GetAutoscalingPlansActionType ActionType = "GetAutoscalingPlans"
GetRecoveringMarkActionType ActionType = "GetRecoveringMark"
PDMSTransferPrimaryActionType ActionType = "PDMSTransferPrimary"
)

type NotFoundReaction struct {
Expand Down Expand Up @@ -78,6 +80,15 @@ func (c *FakePDClient) GetMSMembers(_ string) ([]string, error) {
return result.([]string), nil
}

func (c *FakePDClient) GetMSPrimary(_ string) (string, error) {
action := &Action{}
result, err := c.fakeAPI(GetPDMSPrimaryActionType, action)
if err != nil {
return "", err
}
return result.(string), nil
}

func NewFakePDClient() *FakePDClient {
return &FakePDClient{reactions: map[ActionType]Reaction{}}
}
Expand Down Expand Up @@ -291,3 +302,40 @@ func (c *FakePDClient) GetRecoveringMark() (bool, error) {

return true, nil
}

// FakePDMSClient implements a fake version of PDMSClient.
type FakePDMSClient struct {
reactions map[ActionType]Reaction
}

func NewFakePDMSClient() *FakePDMSClient {
return &FakePDMSClient{reactions: map[ActionType]Reaction{}}
}

func (c *FakePDMSClient) AddReaction(actionType ActionType, reaction Reaction) {
c.reactions[actionType] = reaction
}

// fakeAPI is a small helper for fake API calls
func (c *FakePDMSClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
if reaction, ok := c.reactions[actionType]; ok {
result, err := reaction(action)
if err != nil {
return nil, err
}
return result, nil
}
return nil, &NotFoundReaction{actionType}
}

func (c *FakePDMSClient) GetHealth() error {
action := &Action{}
_, err := c.fakeAPI(GetHealthActionType, action)
return err
}

func (c *FakePDMSClient) TransferPrimary(newPrimary string) error {
action := &Action{Name: newPrimary}
_, err := c.fakeAPI(PDMSTransferPrimaryActionType, action)
return err
}
14 changes: 13 additions & 1 deletion pkg/pdapi/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type FakePDControl struct {

func NewFakePDControl(secretLister corelisterv1.SecretLister) *FakePDControl {
return &FakePDControl{
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}},
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, pdMSClients: map[string]PDMSClient{}},
}
}

Expand All @@ -352,3 +352,15 @@ func (fpc *FakePDControl) SetPDClientWithClusterDomain(namespace Namespace, tcNa
func (fpc *FakePDControl) SetPDClientWithAddress(peerURL string, pdclient PDClient) {
fpc.defaultPDControl.pdClients[peerURL] = pdclient
}

func (fpc *FakePDControl) SetPDMSClient(namespace Namespace, tcName, curService string, pdmsclient PDMSClient) {
fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", "", curService, false)] = pdmsclient
}

func (fpc *FakePDControl) SetPDMSClientWithClusterDomain(namespace Namespace, tcName, tcClusterDomain, curService string, pdmsclient PDMSClient) {
fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", tcClusterDomain, curService, false)] = pdmsclient
}

func (fpc *FakePDControl) SetPDMSClientWithAddress(peerURL string, pdmsclient PDMSClient) {
fpc.defaultPDControl.pdMSClients[peerURL] = pdmsclient
}
17 changes: 17 additions & 0 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type PDClient interface {
GetRecoveringMark() (bool, error)
// GetMSMembers returns all PDMS members service-addr from cluster by specific Micro Service
GetMSMembers(service string) ([]string, error)
// GetMSPrimary returns the primary PDMS member service-addr from cluster by specific Micro Service
GetMSPrimary(service string) (string, error)
}

var (
Expand Down Expand Up @@ -341,6 +343,21 @@ func (c *pdClient) GetMSMembers(service string) ([]string, error) {
return addrs, nil
}

func (c *pdClient) GetMSPrimary(service string) (string, error) {
apiURL := fmt.Sprintf("%s/%s/primary/%s", c.url, MicroServicePrefix, service)
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
if err != nil {
return "", err
}
var primary string
err = json.Unmarshal(body, &primary)
if err != nil {
return "", err
}

return primary, nil
}

func (c *pdClient) getStores(apiURL string) (*StoresInfo, error) {
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
if err != nil {
Expand Down
25 changes: 24 additions & 1 deletion pkg/pdapi/pdms_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package pdapi

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
Expand All @@ -27,10 +29,13 @@ import (
type PDMSClient interface {
// GetHealth returns ping result
GetHealth() error
// TransferPrimary transfers the primary to the newPrimary
TransferPrimary(newPrimary string) error
}

var (
pdMSHealthPrefix = "api/v1/health"
pdMSHealthPrefix = "api/v1/health"
pdMSPrimaryTransferPrefix = "api/v1/primary/transfer"
)

// pdMSClient is default implementation of PDClient
Expand Down Expand Up @@ -69,3 +74,21 @@ func (c *pdMSClient) GetHealth() error {
}
return nil
}

func (c *pdMSClient) TransferPrimary(newPrimary string) error {
apiURL := fmt.Sprintf("%s/%s/%s", c.url, c.serviceName, pdMSPrimaryTransferPrefix)
data, err := json.Marshal(struct {
NewPrimary string `json:"new_primary"`
}{
NewPrimary: newPrimary,
})
if err != nil {
return err
}
_, err = httputil.PostBodyOK(c.httpClient, apiURL, bytes.NewBuffer(data))
if err != nil {
return err
}

return nil
}
Loading