diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 24ad02d8cae..1f7f2ec6648 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -15,6 +15,7 @@ package v2 import ( "context" + "fmt" "net/http" "sort" "strings" @@ -23,16 +24,17 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" - "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -412,26 +414,31 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { zap.String("changefeedInfo", oldCfInfo.String()), zap.Any("upstreamInfo", OldUpInfo)) - var pdAddrs []string - var credentials *security.Credential - if OldUpInfo != nil { - pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",") - credentials = &security.Credential{ - CAPath: OldUpInfo.CAPath, - CertPath: OldUpInfo.CertPath, - KeyPath: OldUpInfo.KeyPath, - CertAllowedCN: OldUpInfo.CertAllowedCN, - } - } - if len(updateCfConfig.PDAddrs) != 0 { - pdAddrs = updateCfConfig.PDAddrs - credentials = updateCfConfig.PDConfig.toCredential() + upManager, err := h.capture.GetUpstreamManager() + if err != nil { + _ = c.Error(err) + return } - storage, err := h.helpers.createTiStore(pdAddrs, credentials) - if err != nil { - _ = c.Error(errors.Trace(err)) + var storage tidbkv.Storage + // if PDAddrs is not empty, use it to create a new kvstore + // Note: upManager is nil in some unit test cases + if len(updateCfConfig.PDAddrs) != 0 || upManager == nil { + pdAddrs := updateCfConfig.PDAddrs + credentials := updateCfConfig.PDConfig.toCredential() + storage, err = h.helpers.createTiStore(pdAddrs, credentials) + if err != nil { + _ = c.Error(errors.Trace(err)) + } + } else { // get the upstream of the changefeed to get the kvstore + up, ok := upManager.Get(oldCfInfo.UpstreamID) + if !ok { + _ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID))) + return + } + storage = up.KVStorage } + newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx, updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs) if err != nil { @@ -665,24 +672,26 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { return } + var pdClient pd.Client + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) - return + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + defer pdClient.Close() } - defer pdClient.Close() if err := h.helpers.verifyResumeChangefeedConfig( ctx, diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index c5eb55c5838..37c6b51355f 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -322,14 +322,15 @@ func TestUpdateChangefeed(t *testing.T) { t.Parallel() update := testCase{url: "/api/v2/changefeeds/%s", method: "PUT"} helpers := NewMockAPIV2Helpers(gomock.NewController(t)) - cp := mock_capture.NewMockCapture(gomock.NewController(t)) - apiV2 := NewOpenAPIV2ForTest(cp, helpers) + mockCapture := mock_capture.NewMockCapture(gomock.NewController(t)) + apiV2 := NewOpenAPIV2ForTest(mockCapture, helpers) router := newRouter(apiV2) statusProvider := &mockStatusProvider{} - cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() - cp.EXPECT().IsReady().Return(true).AnyTimes() - cp.EXPECT().IsOwner().Return(true).AnyTimes() + + mockCapture.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() + mockCapture.EXPECT().IsReady().Return(true).AnyTimes() + mockCapture.EXPECT().IsOwner().Return(true).AnyTimes() // case 1 invalid id invalidID := "Invalid_#" @@ -383,7 +384,7 @@ func TestUpdateChangefeed(t *testing.T) { etcdClient.EXPECT(). GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(100)), gomock.Any()). Return(nil, cerrors.ErrUpstreamNotFound).Times(1) - cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), update.method, @@ -400,7 +401,7 @@ func TestUpdateChangefeed(t *testing.T) { etcdClient.EXPECT(). GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(1)), gomock.Any()). Return(nil, nil).AnyTimes() - cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), update.method, @@ -437,6 +438,7 @@ func TestUpdateChangefeed(t *testing.T) { createTiStore(gomock.Any(), gomock.Any()). Return(nil, nil). AnyTimes() + mockCapture.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes() helpers.EXPECT(). verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused). @@ -456,6 +458,7 @@ func TestUpdateChangefeed(t *testing.T) { require.Equal(t, http.StatusBadRequest, w.Code) // case 7: update transaction failed + mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() helpers.EXPECT(). verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil). @@ -479,6 +482,7 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) + mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) @@ -495,6 +499,7 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) + mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1)