diff --git a/cdc/api/v2/api_helpers_test.go b/cdc/api/v2/api_helpers_test.go index f6874206436..ec4f4a361df 100644 --- a/cdc/api/v2/api_helpers_test.go +++ b/cdc/api/v2/api_helpers_test.go @@ -67,35 +67,35 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { // invalid changefeed id or namespace id cfg.ID = "abdc/sss" - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.ID = "" cfg.Namespace = "abdc/sss" - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.ID = "" cfg.Namespace = "" // changefeed already exists ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, cerror.ErrChangeFeedNotExists.GenWithStackByArgs("aaa")) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.Nil(t, err) require.Equal(t, uint64(123), cfInfo.UpstreamID) cfg.TargetTs = 3 cfg.StartTs = 4 ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.TargetTs = 6 cfg.SinkURI = "aaab://" ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.SinkURI = string([]byte{0x7f, ' '}) ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.StartTs = 0 @@ -103,7 +103,7 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro" cfg.ReplicaConfig.ForceReplicate = false ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) + _, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NoError(t, err) cfg.ReplicaConfig.ForceReplicate = true diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 7a9468be7f2..137d1c1e38d 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -15,6 +15,7 @@ package v2 import ( "context" + "fmt" "net/http" "sort" "strings" @@ -23,16 +24,17 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" - "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -440,26 +442,31 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { zap.String("changefeedInfo", oldCfInfo.String()), zap.Any("upstreamInfo", OldUpInfo)) - var pdAddrs []string - var credentials *security.Credential - if OldUpInfo != nil { - pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",") - credentials = &security.Credential{ - CAPath: OldUpInfo.CAPath, - CertPath: OldUpInfo.CertPath, - KeyPath: OldUpInfo.KeyPath, - CertAllowedCN: OldUpInfo.CertAllowedCN, - } - } - if len(updateCfConfig.PDAddrs) != 0 { - pdAddrs = updateCfConfig.PDAddrs - credentials = updateCfConfig.PDConfig.toCredential() + upManager, err := h.capture.GetUpstreamManager() + if err != nil { + _ = c.Error(err) + return } - storage, err := h.helpers.createTiStore(pdAddrs, credentials) - if err != nil { - _ = c.Error(errors.Trace(err)) + var storage tidbkv.Storage + // if PDAddrs is not empty, use it to create a new kvstore + // Note: upManager is nil in some unit test cases + if len(updateCfConfig.PDAddrs) != 0 || upManager == nil { + pdAddrs := updateCfConfig.PDAddrs + credentials := updateCfConfig.PDConfig.toCredential() + storage, err = h.helpers.createTiStore(pdAddrs, credentials) + if err != nil { + _ = c.Error(errors.Trace(err)) + } + } else { // get the upstream of the changefeed to get the kvstore + up, ok := upManager.Get(oldCfInfo.UpstreamID) + if !ok { + _ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID))) + return + } + storage = up.KVStorage } + newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx, updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs) if err != nil { @@ -713,24 +720,26 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { return } + var pdClient pd.Client + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) - return + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + defer pdClient.Close() } - defer pdClient.Close() if err := h.helpers.verifyResumeChangefeedConfig( ctx, diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 8bfcc12eb9d..f85221a2f09 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -333,14 +333,14 @@ func TestUpdateChangefeed(t *testing.T) { t.Parallel() update := testCase{url: "/api/v2/changefeeds/%s", method: "PUT"} helpers := NewMockAPIV2Helpers(gomock.NewController(t)) - cp := mock_capture.NewMockCapture(gomock.NewController(t)) - apiV2 := NewOpenAPIV2ForTest(cp, helpers) + mockCapture := mock_capture.NewMockCapture(gomock.NewController(t)) + apiV2 := NewOpenAPIV2ForTest(mockCapture, helpers) router := newRouter(apiV2) statusProvider := &mockStatusProvider{} - cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() - cp.EXPECT().IsReady().Return(true).AnyTimes() - cp.EXPECT().IsController().Return(true).AnyTimes() + mockCapture.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() + mockCapture.EXPECT().IsReady().Return(true).AnyTimes() + mockCapture.EXPECT().IsController().Return(true).AnyTimes() // case 1 invalid id invalidID := "Invalid_#" @@ -394,7 +394,7 @@ func TestUpdateChangefeed(t *testing.T) { etcdClient.EXPECT(). GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(100)), gomock.Any()). Return(nil, cerrors.ErrUpstreamNotFound).Times(1) - cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), update.method, @@ -411,7 +411,7 @@ func TestUpdateChangefeed(t *testing.T) { etcdClient.EXPECT(). GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(1)), gomock.Any()). Return(nil, nil).AnyTimes() - cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), update.method, @@ -448,6 +448,7 @@ func TestUpdateChangefeed(t *testing.T) { createTiStore(gomock.Any(), gomock.Any()). Return(nil, nil). AnyTimes() + mockCapture.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes() helpers.EXPECT(). verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused). @@ -467,6 +468,7 @@ func TestUpdateChangefeed(t *testing.T) { require.Equal(t, http.StatusBadRequest, w.Code) // case 7: update transaction failed + mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() helpers.EXPECT(). verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil). @@ -490,6 +492,7 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) + mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) @@ -506,6 +509,7 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) + mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1)