From 255d8104c8b9b4f8af191ee69d301ced723ccc3d Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 16 Nov 2023 14:33:47 +0800 Subject: [PATCH] use upstream pd client in api v2 (#10099) ref pingcap/tiflow#9584 --- cdc/api/v2/changefeed.go | 68 ++++++++++++++++++++--------------- cdc/api/v2/changefeed_test.go | 4 +++ 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 1db74c11884..b63a6fabfb7 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -15,6 +15,7 @@ package v2 import ( "context" + "fmt" "net/http" "sort" "strings" @@ -23,16 +24,17 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" - "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -344,26 +346,30 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { zap.String("changefeedInfo", oldCfInfo.String()), zap.Any("upstreamInfo", OldUpInfo)) - var pdAddrs []string - var credentials *security.Credential - if OldUpInfo != nil { - pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",") - credentials = &security.Credential{ - CAPath: OldUpInfo.CAPath, - CertPath: OldUpInfo.CertPath, - KeyPath: OldUpInfo.KeyPath, - CertAllowedCN: OldUpInfo.CertAllowedCN, - } + upManager, err := h.capture.GetUpstreamManager() + if err != nil { + _ = c.Error(err) + return } - if len(updateCfConfig.PDAddrs) != 0 { - pdAddrs = updateCfConfig.PDAddrs - credentials = updateCfConfig.PDConfig.toCredential() + var storage tidbkv.Storage + // if PDAddrs is not empty, use it to create a new kvstore + // Note: upManager is nil in some unit test cases + if len(updateCfConfig.PDAddrs) != 0 || upManager == nil { + pdAddrs := updateCfConfig.PDAddrs + credentials := updateCfConfig.PDConfig.toCredential() + storage, err = h.helpers.createTiStore(pdAddrs, credentials) + if err != nil { + _ = c.Error(errors.Trace(err)) + } + } else { // get the upstream of the changefeed to get the kvstore + up, ok := upManager.Get(oldCfInfo.UpstreamID) + if !ok { + _ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID))) + return + } + storage = up.KVStorage } - storage, err := h.helpers.createTiStore(pdAddrs, credentials) - if err != nil { - _ = c.Error(errors.Trace(err)) - } newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx, updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs) if err != nil { @@ -586,24 +592,28 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { return } + var pdClient pd.Client + // if PDAddrs is empty, use the default pdClien if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) - return + // use the default pdClient + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + // create a new pdClient + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + defer pdClient.Close() } - defer pdClient.Close() if err := h.helpers.verifyResumeChangefeedConfig( ctx, diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 31bcbd4f79f..61830b1bc84 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -426,6 +426,7 @@ func TestUpdateChangefeed(t *testing.T) { createTiStore(gomock.Any(), gomock.Any()). Return(nil, nil). AnyTimes() + cp.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes() helpers.EXPECT(). verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused). @@ -445,6 +446,7 @@ func TestUpdateChangefeed(t *testing.T) { require.Equal(t, http.StatusBadRequest, w.Code) // case 7: update transaction failed + cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() helpers.EXPECT(). verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil). @@ -468,6 +470,7 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) + cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) @@ -484,6 +487,7 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) + cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1)