diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 690008cb023..531f4855f69 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -316,9 +316,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { CAPath: up.SecurityConfig.CAPath, CertAllowedCN: up.SecurityConfig.CertAllowedCN, } - err = h.capture.GetEtcdClient().CreateChangefeedInfo( + err = ctrl.CreateChangefeed( ctx, upstreamInfo, - info, model.DefaultChangeFeedID(changefeedConfig.ID)) + info) if err != nil { _ = c.Error(err) return diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index cd64a9d74b8..1cc070d7618 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -162,10 +162,9 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { return } - err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, + err = ctrl.CreateChangefeed(ctx, upstreamInfo, - info, - model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID}) + info) if err != nil { needRemoveGCSafePoint = true _ = c.Error(err) diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 7134f0ffc71..e37cb4b4cda 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -202,8 +202,8 @@ func TestCreateChangefeed(t *testing.T) { SinkURI: cfg.SinkURI, }, nil }).AnyTimes() - etcdClient.EXPECT(). - CreateChangefeedInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ctrl.EXPECT(). + CreateChangefeed(gomock.Any(), gomock.Any(), gomock.Any()). Return(cerrors.ErrPDEtcdAPIError).Times(1) cfConfig.SinkURI = mysqlSink @@ -223,8 +223,8 @@ func TestCreateChangefeed(t *testing.T) { helpers.EXPECT(). getEtcdClient(gomock.Any(), gomock.Any()). Return(testEtcdCluster.RandClient(), nil) - etcdClient.EXPECT(). - CreateChangefeedInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ctrl.EXPECT(). + CreateChangefeed(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil). AnyTimes() w = httptest.NewRecorder() diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index b1b2fda90e7..6c1036f03f7 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -122,7 +122,7 @@ type captureImpl struct { cfg *config.SchedulerConfig, ) processor.Manager newOwner func(upstreamManager *upstream.Manager, cfg *config.SchedulerConfig) owner.Owner - newController func(upstreamManager *upstream.Manager, captureInfo *model.CaptureInfo) controller.Controller + newController func(upstreamManager *upstream.Manager, captureInfo *model.CaptureInfo, client etcd.CDCEtcdClient) controller.Controller } // NewCapture returns a new Capture instance @@ -485,7 +485,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev)) - controller := c.newController(c.upstreamManager, c.info) + controller := c.newController(c.upstreamManager, c.info, c.EtcdClient) owner := c.newOwner(c.upstreamManager, c.config.Debug.Scheduler) c.setOwner(owner) diff --git a/cdc/controller/controller.go b/cdc/controller/controller.go index fb34aa2e58f..3ca900588cd 100644 --- a/cdc/controller/controller.go +++ b/cdc/controller/controller.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/version" @@ -56,8 +57,14 @@ type Controller interface { GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error) + CreateChangefeed(context.Context, + *model.UpstreamInfo, + *model.ChangeFeedInfo, + ) error } +var _ Controller = &controllerImpl{} + type controllerImpl struct { changefeeds map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState captures map[model.CaptureID]*model.CaptureInfo @@ -78,6 +85,7 @@ type controllerImpl struct { sync.Mutex queue []*controllerJob } + etcdClient etcd.CDCEtcdClient captureInfo *model.CaptureInfo } @@ -86,6 +94,7 @@ type controllerImpl struct { func NewController( upstreamManager *upstream.Manager, captureInfo *model.CaptureInfo, + etcdClient etcd.CDCEtcdClient, ) Controller { return &controllerImpl{ upstreamManager: upstreamManager, @@ -93,6 +102,7 @@ func NewController( lastTickTime: time.Now(), logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate), captureInfo: captureInfo, + etcdClient: etcdClient, } } @@ -281,6 +291,13 @@ func (o *controllerImpl) GetChangefeedOwnerCaptureInfo(id model.ChangeFeedID) *m return o.captureInfo } +func (o *controllerImpl) CreateChangefeed(ctx context.Context, + upstreamInfo *model.UpstreamInfo, + cfInfo *model.ChangeFeedInfo, +) error { + return o.etcdClient.CreateChangefeedInfo(ctx, upstreamInfo, cfInfo) +} + // Export field names for pretty printing. type controllerJob struct { Tp controllerJobType diff --git a/cdc/controller/controller_test.go b/cdc/controller/controller_test.go index 321bb1eb271..e1296b006e7 100644 --- a/cdc/controller/controller_test.go +++ b/cdc/controller/controller_test.go @@ -46,7 +46,7 @@ func createController4Test(ctx cdcContext.Context, } m := upstream.NewManager4Test(pdClient) - o := NewController(m, &model.CaptureInfo{}).(*controllerImpl) + o := NewController(m, &model.CaptureInfo{}, nil).(*controllerImpl) state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID) tester := orchestrator.NewReactorStateTester(t, state, nil) @@ -66,7 +66,7 @@ func createController4Test(ctx cdcContext.Context, func TestUpdateGCSafePoint(t *testing.T) { mockPDClient := &gc.MockPDClient{} m := upstream.NewManager4Test(mockPDClient) - o := NewController(m, &model.CaptureInfo{}).(*controllerImpl) + o := NewController(m, &model.CaptureInfo{}, nil).(*controllerImpl) ctx := cdcContext.NewBackendContext4Test(true) ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() diff --git a/cdc/controller/mock/controller_mock.go b/cdc/controller/mock/controller_mock.go index 6d04ee26b64..91f53eb96e6 100644 --- a/cdc/controller/mock/controller_mock.go +++ b/cdc/controller/mock/controller_mock.go @@ -48,6 +48,20 @@ func (mr *MockControllerMockRecorder) AsyncStop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncStop", reflect.TypeOf((*MockController)(nil).AsyncStop)) } +// CreateChangefeed mocks base method. +func (m *MockController) CreateChangefeed(arg0 context.Context, arg1 *model.UpstreamInfo, arg2 *model.ChangeFeedInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateChangefeed", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateChangefeed indicates an expected call of CreateChangefeed. +func (mr *MockControllerMockRecorder) CreateChangefeed(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateChangefeed", reflect.TypeOf((*MockController)(nil).CreateChangefeed), arg0, arg1, arg2) +} + // GetAllChangeFeedCheckpointTs mocks base method. func (m *MockController) GetAllChangeFeedCheckpointTs(ctx context.Context) (map[model.ChangeFeedID]uint64, error) { m.ctrl.T.Helper() diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 425b00afc96..c5b8371dba1 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -145,7 +145,6 @@ type CDCEtcdClient interface { CreateChangefeedInfo(context.Context, *model.UpstreamInfo, *model.ChangeFeedInfo, - model.ChangeFeedID, ) error UpdateChangefeedAndUpstream(ctx context.Context, @@ -426,8 +425,11 @@ func (c *CDCEtcdClientImpl) RevokeAllLeases(ctx context.Context, leases map[stri func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context, upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, ) error { + changeFeedID := model.ChangeFeedID{ + Namespace: info.Namespace, + ID: info.ID, + } infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) jobKey := GetEtcdKeyJob(c.ClusterID, changeFeedID) upstreamInfoKey := CDCKey{ diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 3f7a9bb9b68..15388d68f8f 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -214,11 +214,11 @@ func TestCreateChangefeed(t *testing.T) { upstreamInfo := &model.UpstreamInfo{ID: 1} err := s.client.CreateChangefeedInfo(ctx, - upstreamInfo, detail, model.DefaultChangeFeedID("test-id")) + upstreamInfo, detail) require.NoError(t, err) err = s.client.CreateChangefeedInfo(ctx, - upstreamInfo, detail, model.DefaultChangeFeedID("test-id")) + upstreamInfo, detail) require.True(t, cerror.ErrChangeFeedAlreadyExists.Equal(err)) } diff --git a/pkg/etcd/mock/etcd_client_mock.go b/pkg/etcd/mock/etcd_client_mock.go index 21854ee9482..dd2cf19c6ec 100644 --- a/pkg/etcd/mock/etcd_client_mock.go +++ b/pkg/etcd/mock/etcd_client_mock.go @@ -122,17 +122,17 @@ func (mr *MockCDCEtcdClientMockRecorder) CheckMultipleCDCClusterExist(ctx interf } // CreateChangefeedInfo mocks base method. -func (m *MockCDCEtcdClient) CreateChangefeedInfo(arg0 context.Context, arg1 *model.UpstreamInfo, arg2 *model.ChangeFeedInfo, arg3 model.ChangeFeedID) error { +func (m *MockCDCEtcdClient) CreateChangefeedInfo(arg0 context.Context, arg1 *model.UpstreamInfo, arg2 *model.ChangeFeedInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateChangefeedInfo", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "CreateChangefeedInfo", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // CreateChangefeedInfo indicates an expected call of CreateChangefeedInfo. -func (mr *MockCDCEtcdClientMockRecorder) CreateChangefeedInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockCDCEtcdClientMockRecorder) CreateChangefeedInfo(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateChangefeedInfo", reflect.TypeOf((*MockCDCEtcdClient)(nil).CreateChangefeedInfo), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateChangefeedInfo", reflect.TypeOf((*MockCDCEtcdClient)(nil).CreateChangefeedInfo), arg0, arg1, arg2) } // DeleteCaptureInfo mocks base method.