diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 59230dd7674..5a58446ec67 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -517,12 +517,12 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { ctx, cancelOwner := context.WithCancel(ctx) ownerCtx := cdcContext.NewContext(ctx, newGlobalVars) g.Go(func() error { - return c.runEtcdWorker(ownerCtx, owner, + return c.runEtcdWorker(ownerCtx, owner.(orchestrator.Reactor), orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL), ownerFlushInterval, util.RoleOwner.String()) }) g.Go(func() error { - er := c.runEtcdWorker(ownerCtx, controller, + er := c.runEtcdWorker(ownerCtx, controller.(orchestrator.Reactor), globalState, // todo: do not use owner flush interval ownerFlushInterval, util.RoleController.String()) diff --git a/cdc/controller/controller.go b/cdc/controller/controller.go index 3ca900588cd..20bbf62f795 100644 --- a/cdc/controller/controller.go +++ b/cdc/controller/controller.go @@ -45,7 +45,6 @@ const versionInconsistentLogRate = 1 // Controller is a manager to schedule changefeeds type Controller interface { - orchestrator.Reactor AsyncStop() GetChangefeedOwnerCaptureInfo(id model.ChangeFeedID) *model.CaptureInfo GetAllChangeFeedInfo(ctx context.Context) ( @@ -63,7 +62,10 @@ type Controller interface { ) error } -var _ Controller = &controllerImpl{} +var ( + _ orchestrator.Reactor = &controllerImpl{} + _ Controller = &controllerImpl{} +) type controllerImpl struct { changefeeds map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState diff --git a/cdc/controller/mock/controller_mock.go b/cdc/controller/mock/controller_mock.go index 91f53eb96e6..1d9b8a423ef 100644 --- a/cdc/controller/mock/controller_mock.go +++ b/cdc/controller/mock/controller_mock.go @@ -10,7 +10,6 @@ import ( gomock "github.com/golang/mock/gomock" model "github.com/pingcap/tiflow/cdc/model" - orchestrator "github.com/pingcap/tiflow/pkg/orchestrator" ) // MockController is a mock of Controller interface. @@ -150,18 +149,3 @@ func (mr *MockControllerMockRecorder) IsChangefeedExists(ctx, id interface{}) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsChangefeedExists", reflect.TypeOf((*MockController)(nil).IsChangefeedExists), ctx, id) } - -// Tick mocks base method. -func (m *MockController) Tick(ctx context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Tick", ctx, state) - ret0, _ := ret[0].(orchestrator.ReactorState) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Tick indicates an expected call of Tick. -func (mr *MockControllerMockRecorder) Tick(ctx, state interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockController)(nil).Tick), ctx, state) -} diff --git a/cdc/model/errors_test.go b/cdc/model/errors_test.go index 74559dc1937..64bf21ccd31 100644 --- a/cdc/model/errors_test.go +++ b/cdc/model/errors_test.go @@ -15,12 +15,15 @@ package model import ( "testing" + "time" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) func TestIsChangefeedNotRetryError(t *testing.T) { + t.Parallel() + cases := []struct { err RunningError result bool @@ -55,3 +58,50 @@ func TestIsChangefeedNotRetryError(t *testing.T) { require.Equal(t, c.result, c.err.ShouldFailChangefeed()) } } + +func TestRunningErrorScan(t *testing.T) { + t.Parallel() + + timeNow := time.Now() + timeNowJSON, err := timeNow.MarshalJSON() + require.Nil(t, err) + + newTime := time.Time{} + err = newTime.UnmarshalJSON(timeNowJSON) + require.Nil(t, err) + // timeNow: 2023-10-13 16:48:08.345614 +0800 CST m=+0.027639459 + // newTime: 2023-10-13 16:48:08.345614 +0800 CST + require.NotEqual(t, timeNow, newTime) + + cases := []struct { + err RunningError + result string + }{ + { + RunningError{ + Time: timeNow, + Addr: "", + Code: string(cerror.ErrAPIGetPDClientFailed.RFCCode()), + Message: cerror.ErrAPIGetPDClientFailed.Error(), + }, + `{"time":` + string(timeNowJSON) + + `,"addr":"","code":"CDC:ErrAPIGetPDClientFailed","message":"` + + cerror.ErrAPIGetPDClientFailed.Error() + `"}`, + }, + } + + for _, c := range cases { + v, err := c.err.Value() + b, ok := v.([]byte) + require.True(t, ok) + require.Nil(t, err) + require.Equal(t, c.result, string(b)) + + var err2 RunningError + err = err2.Scan(b) + require.Nil(t, err) + require.Equal(t, c.err.Addr, err2.Addr) + require.Equal(t, c.err.Code, err2.Code) + require.Equal(t, c.err.Message, err2.Message) + } +} diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go index 7094bdafce3..06f27cbe98b 100644 --- a/cdc/owner/mock/owner_mock.go +++ b/cdc/owner/mock/owner_mock.go @@ -13,7 +13,6 @@ import ( model "github.com/pingcap/tiflow/cdc/model" owner "github.com/pingcap/tiflow/cdc/owner" scheduler "github.com/pingcap/tiflow/cdc/scheduler" - orchestrator "github.com/pingcap/tiflow/pkg/orchestrator" ) // MockOwner is a mock of Owner interface. @@ -111,21 +110,6 @@ func (mr *MockOwnerMockRecorder) ScheduleTable(cfID, toCapture, tableID, done in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleTable", reflect.TypeOf((*MockOwner)(nil).ScheduleTable), cfID, toCapture, tableID, done) } -// Tick mocks base method. -func (m *MockOwner) Tick(ctx context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Tick", ctx, state) - ret0, _ := ret[0].(orchestrator.ReactorState) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Tick indicates an expected call of Tick. -func (mr *MockOwnerMockRecorder) Tick(ctx, state interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockOwner)(nil).Tick), ctx, state) -} - // UpdateChangefeed mocks base method. func (m *MockOwner) UpdateChangefeed(ctx context.Context, changeFeedInfo *model.ChangeFeedInfo) error { m.ctrl.T.Helper() diff --git a/cdc/owner/mock/status_provider_mock.go b/cdc/owner/mock/status_provider_mock.go index 22442877668..2cc69039b23 100644 --- a/cdc/owner/mock/status_provider_mock.go +++ b/cdc/owner/mock/status_provider_mock.go @@ -35,36 +35,6 @@ func (m *MockStatusProvider) EXPECT() *MockStatusProviderMockRecorder { return m.recorder } -// GetAllChangeFeedInfo mocks base method. -func (m *MockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAllChangeFeedInfo", ctx) - ret0, _ := ret[0].(map[model.ChangeFeedID]*model.ChangeFeedInfo) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetAllChangeFeedInfo indicates an expected call of GetAllChangeFeedInfo. -func (mr *MockStatusProviderMockRecorder) GetAllChangeFeedInfo(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllChangeFeedInfo", reflect.TypeOf((*MockStatusProvider)(nil).GetAllChangeFeedInfo), ctx) -} - -// GetAllChangeFeedStatuses mocks base method. -func (m *MockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAllChangeFeedStatuses", ctx) - ret0, _ := ret[0].(map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetAllChangeFeedStatuses indicates an expected call of GetAllChangeFeedStatuses. -func (mr *MockStatusProviderMockRecorder) GetAllChangeFeedStatuses(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllChangeFeedStatuses", reflect.TypeOf((*MockStatusProvider)(nil).GetAllChangeFeedStatuses), ctx) -} - // GetAllTaskStatuses mocks base method. func (m *MockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) { m.ctrl.T.Helper() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index de05c6f0aa1..966e26da00f 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -81,7 +81,6 @@ type ownerJob struct { // // The interface is thread-safe, except for Tick, it's only used by etcd worker. type Owner interface { - orchestrator.Reactor EnqueueJob(adminJob model.AdminJob, done chan<- error) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) ScheduleTable( @@ -132,7 +131,10 @@ type ownerImpl struct { cfg *config.SchedulerConfig } -var _ Owner = &ownerImpl{} +var ( + _ orchestrator.Reactor = &ownerImpl{} + _ Owner = &ownerImpl{} +) // NewOwner creates a new Owner func NewOwner( @@ -585,31 +587,31 @@ func (o *ownerImpl) handleJobs(ctx context.Context) { func (o *ownerImpl) handleQueries(query *Query) error { switch query.Tp { - case QueryAllChangeFeedStatuses: - ret := map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{} - for cfID, cfReactor := range o.changefeeds { - ret[cfID] = &model.ChangeFeedStatusForAPI{} - if cfReactor.latestStatus == nil { - continue - } - ret[cfID].ResolvedTs = cfReactor.resolvedTs - ret[cfID].CheckpointTs = cfReactor.latestStatus.CheckpointTs + case QueryChangeFeedStatuses: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] + if !ok { + query.Data = nil + return nil } + ret := &model.ChangeFeedStatusForAPI{} + ret.ResolvedTs = cfReactor.resolvedTs + ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs query.Data = ret - case QueryAllChangeFeedInfo: - ret := map[model.ChangeFeedID]*model.ChangeFeedInfo{} - for cfID, cfReactor := range o.changefeeds { - if cfReactor.latestInfo == nil { - ret[cfID] = &model.ChangeFeedInfo{} - continue - } + case QueryChangefeedInfo: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] + if !ok { + query.Data = nil + return nil + } + if cfReactor.latestInfo == nil { + query.Data = &model.ChangeFeedInfo{} + } else { var err error - ret[cfID], err = cfReactor.latestInfo.Clone() + query.Data, err = cfReactor.latestInfo.Clone() if err != nil { return errors.Trace(err) } } - query.Data = ret case QueryAllTaskStatuses: cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 9f9dcd1f198..0fdae64e1ac 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -425,11 +425,20 @@ func TestHandleJobsDontBlock(t *testing.T) { ctx1, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - var errIn error var infos map[model.ChangeFeedID]*model.ChangeFeedInfo done := make(chan struct{}) go func() { - infos, errIn = statusProvider.GetAllChangeFeedInfo(ctx1) + info1, err := statusProvider.GetChangeFeedInfo(ctx1, cf1) + require.Nil(t, err) + info2, err := statusProvider.GetChangeFeedInfo(ctx1, cf2) + require.Nil(t, err) + info3, err := statusProvider.GetChangeFeedInfo(ctx1, cf3) + require.Nil(t, err) + infos = map[model.ChangeFeedID]*model.ChangeFeedInfo{ + cf1: info1, + cf2: info2, + cf3: info3, + } done <- struct{}{} }() @@ -447,7 +456,6 @@ WorkLoop: require.Nil(t, err) } } - require.Nil(t, errIn) require.NotNil(t, infos[cf1]) require.NotNil(t, infos[cf2]) require.NotNil(t, infos[cf3]) diff --git a/cdc/owner/status_provider.go b/cdc/owner/status_provider.go index 9cfdff04ff5..f1b6a057ddd 100644 --- a/cdc/owner/status_provider.go +++ b/cdc/owner/status_provider.go @@ -24,15 +24,9 @@ import ( // StatusProvider provide some func to get meta-information from owner // The interface is thread-safe. type StatusProvider interface { - // GetAllChangeFeedStatuses returns all changefeeds' runtime status. - GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI, error) - // GetChangeFeedStatus returns a changefeeds' runtime status. GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatusForAPI, error) - // GetAllChangeFeedInfo returns all changefeeds' info. - GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) - // GetChangeFeedInfo returns a changefeeds' info. GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) @@ -55,12 +49,8 @@ type StatusProvider interface { type QueryType int32 const ( - // QueryAllChangeFeedStatuses query all changefeed status. - QueryAllChangeFeedStatuses QueryType = iota - // QueryAllChangeFeedInfo is the type of query all changefeed info. - QueryAllChangeFeedInfo // QueryAllTaskStatuses is the type of query all task statuses. - QueryAllTaskStatuses + QueryAllTaskStatuses QueryType = iota // QueryProcessors is the type of query processors. QueryProcessors // QueryCaptures is the type of query captures info. @@ -68,7 +58,11 @@ const ( // QueryHealth is the type of query cluster health info. QueryHealth // QueryOwner is the type of query changefeed owner - QueryOwner = 6 + QueryOwner + // QueryChangefeedInfo is the type of query changefeed info + QueryChangefeedInfo + // QueryChangeFeedStatuses is the type of query changefeed status + QueryChangeFeedStatuses ) // Query wraps query command and return results. @@ -88,56 +82,36 @@ type ownerStatusProvider struct { owner Owner } -func (p *ownerStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) ( - map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI, error, -) { - query := &Query{ - Tp: QueryAllChangeFeedStatuses, - } - if err := p.sendQueryToOwner(ctx, query); err != nil { - return nil, errors.Trace(err) - } - return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI), nil -} - func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID, ) (*model.ChangeFeedStatusForAPI, error) { - statuses, err := p.GetAllChangeFeedStatuses(ctx) - if err != nil { - return nil, errors.Trace(err) - } - status, exist := statuses[changefeedID] - if !exist { - return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID) - } - return status, nil -} - -func (p *ownerStatusProvider) GetAllChangeFeedInfo(ctx context.Context) ( - map[model.ChangeFeedID]*model.ChangeFeedInfo, error, -) { query := &Query{ - Tp: QueryAllChangeFeedInfo, + Tp: QueryChangeFeedStatuses, + ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil + if query.Data == nil { + return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID) + } + return query.Data.(*model.ChangeFeedStatusForAPI), nil } func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID, ) (*model.ChangeFeedInfo, error) { - infos, err := p.GetAllChangeFeedInfo(ctx) - if err != nil { + query := &Query{ + Tp: QueryChangefeedInfo, + ChangeFeedID: changefeedID, + } + if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - info, exist := infos[changefeedID] - if !exist { + if query.Data == nil { return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID) } - return info, nil + return query.Data.(*model.ChangeFeedInfo), nil } func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, diff --git a/cdc/server/server.go b/cdc/server/server.go index 3180ee0dc10..f57ab945a11 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory" + capturev2 "github.com/pingcap/tiflow/cdcv2/capture" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" @@ -197,8 +198,13 @@ func (s *server) prepare(ctx context.Context) error { s.createSortEngineFactory() s.setMemoryLimit() - s.capture = capture.NewCapture(s.pdEndpoints, cdcEtcdClient, - s.grpcService, s.sortEngineFactory, s.pdClient) + if conf.Debug.CDCV2.Enable { + s.capture = capturev2.NewCapture(s.pdEndpoints, cdcEtcdClient, + s.grpcService, s.sortEngineFactory, s.pdClient) + } else { + s.capture = capture.NewCapture(s.pdEndpoints, cdcEtcdClient, + s.grpcService, s.sortEngineFactory, s.pdClient) + } return nil } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index f0eb05709a2..447d1a00710 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "net/url" + "strings" "time" dmysql "github.com/go-sql-driver/mysql" @@ -651,20 +652,17 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { func (s *mysqlBackend) multiStmtExecute( ctx context.Context, dmls *preparedDMLs, tx *sql.Tx, writeTimeout time.Duration, ) error { - start := time.Now() - multiStmtSQL := "" - multiStmtArgs := []any{} - for i, query := range dmls.sqls { - multiStmtSQL += query - if i != len(dmls.sqls)-1 { - multiStmtSQL += ";" - } - multiStmtArgs = append(multiStmtArgs, dmls.values[i]...) + var multiStmtArgs []any + for _, value := range dmls.values { + multiStmtArgs = append(multiStmtArgs, value...) } + multiStmtSQL := strings.Join(dmls.sqls, ";") + log.Debug("exec row", zap.Int("workerID", s.workerID), zap.String("sql", multiStmtSQL), zap.Any("args", multiStmtArgs)) ctx, cancel := context.WithTimeout(ctx, writeTimeout) defer cancel() + start := time.Now() _, execError := tx.ExecContext(ctx, multiStmtSQL, multiStmtArgs...) if execError != nil { err := logDMLTxnErr( diff --git a/cdcv2/capture/capture.go b/cdcv2/capture/capture.go new file mode 100644 index 00000000000..2c38c075168 --- /dev/null +++ b/cdcv2/capture/capture.go @@ -0,0 +1,345 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "context" + "database/sql" + "io" + "sync" + + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/controller" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/owner" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory" + controllerv2 "github.com/pingcap/tiflow/cdcv2/controller" + "github.com/pingcap/tiflow/cdcv2/metadata" + msql "github.com/pingcap/tiflow/cdcv2/metadata/sql" + "github.com/pingcap/tiflow/pkg/config" + cdcContext "github.com/pingcap/tiflow/pkg/context" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/version" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" +) + +// NewCapture returns a new Capture instance +func NewCapture(pdEndpoints []string, + etcdClient etcd.CDCEtcdClient, + grpcService *p2p.ServerWrapper, + sortEngineMangerFactory *factory.SortEngineFactory, + pdClient pd.Client, +) capture.Capture { + return &captureImpl{ + config: config.GetGlobalServerConfig(), + liveness: model.LivenessCaptureAlive, + EtcdClient: etcdClient, + grpcService: grpcService, + cancel: func() {}, + pdEndpoints: pdEndpoints, + info: &model.CaptureInfo{}, + sortEngineFactory: sortEngineMangerFactory, + pdClient: pdClient, + } +} + +type captureImpl struct { + // captureMu is used to protect the capture info and processorManager. + captureMu sync.Mutex + info *model.CaptureInfo + liveness model.Liveness + config *config.ServerConfig + + pdClient pd.Client + pdEndpoints []string + ownerMu sync.Mutex + owner owner.Owner + controller controller.Controller + upstreamManager *upstream.Manager + + EtcdClient etcd.CDCEtcdClient + + sortEngineFactory *factory.SortEngineFactory + + // MessageServer is the receiver of the messages from the other nodes. + // It should be recreated each time the capture is restarted. + MessageServer *p2p.MessageServer + + // MessageRouter manages the clients to send messages to all peers. + MessageRouter p2p.MessageRouter + + // grpcService is a wrapper that can hold a MessageServer. + // The instance should last for the whole life of the server, + // regardless of server restarting. + // This design is to solve the problem that grpc-go cannot gracefully + // unregister a service. + grpcService *p2p.ServerWrapper + + cancel context.CancelFunc + + storage *sql.DB + captureObservation metadata.CaptureObservation + controllerObserver metadata.ControllerObservation +} + +func (c *captureImpl) Run(ctx context.Context) error { + defer log.Info("the capture routine has exited") + // Limit the frequency of reset capture to avoid frequent recreating of resources + rl := rate.NewLimiter(0.05, 2) + for { + select { + case <-ctx.Done(): + return nil + default: + } + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + err := rl.Wait(ctx) + if err != nil { + if errors.Cause(err) == context.Canceled { + return nil + } + return errors.Trace(err) + } + err = c.run(ctx) + // if capture suicided, reset the capture and run again. + // if the canceled error throw, there are two possible scenarios: + // 1. the internal context canceled, it means some error happened in + // the internal, and the routine is exited, we should restart + // the capture. + // 2. the parent context canceled, it means that the caller of + // the capture hope the capture to exit, and this loop will return + // in the above `select` block. + // if there are some **internal** context deadline exceeded (IO/network + // timeout), reset the capture and run again. + // + // TODO: make sure the internal cancel should return the real error + // instead of context.Canceled. + if cerror.ErrCaptureSuicide.Equal(err) || + context.Canceled == errors.Cause(err) || + context.DeadlineExceeded == errors.Cause(err) { + log.Info("capture recovered", zap.String("captureID", c.info.ID)) + continue + } + return errors.Trace(err) + } +} + +func (c *captureImpl) run(stdCtx context.Context) error { + err := c.reset(stdCtx) + if err != nil { + log.Error("reset capture failed", zap.Error(err)) + return errors.Trace(err) + } + defer func() { + c.Close() + c.grpcService.Reset(nil) + }() + + g, stdCtx := errgroup.WithContext(stdCtx) + + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ + CaptureInfo: c.info, + EtcdClient: c.EtcdClient, + MessageServer: c.MessageServer, + MessageRouter: c.MessageRouter, + SortEngineFactory: c.sortEngineFactory, + }) + + g.Go(func() error { + return c.MessageServer.Run(ctx, c.MessageRouter.GetLocalChannel()) + }) + + g.Go(func() error { + return c.captureObservation.Run(ctx, + func(ctx context.Context, + controllerObserver metadata.ControllerObservation, + ) error { + c.controllerObserver = controllerObserver + ctrl := controllerv2.NewController( + c.upstreamManager, + c.info, controllerObserver, c.captureObservation) + c.controller = ctrl + return ctrl.Run(ctx) + }) + }) + return errors.Trace(g.Wait()) +} + +// reset the capture before run it. +func (c *captureImpl) reset(ctx context.Context) error { + c.captureMu.Lock() + defer c.captureMu.Unlock() + c.info = &model.CaptureInfo{ + ID: uuid.New().String(), + AdvertiseAddr: c.config.AdvertiseAddr, + Version: version.ReleaseVersion, + } + + if c.upstreamManager != nil { + c.upstreamManager.Close() + } + c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID()) + _, err := c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient) + if err != nil { + return errors.Trace(err) + } + + c.grpcService.Reset(nil) + + if c.MessageRouter != nil { + c.MessageRouter.Close() + c.MessageRouter = nil + } + messageServerConfig := c.config.Debug.Messages.ToMessageServerConfig() + c.MessageServer = p2p.NewMessageServer(c.info.ID, messageServerConfig) + c.grpcService.Reset(c.MessageServer) + + messageClientConfig := c.config.Debug.Messages.ToMessageClientConfig() + + // Puts the advertise-addr of the local node to the client config. + // This is for metrics purpose only, so that the receiver knows which + // node the connections are from. + advertiseAddr := c.config.AdvertiseAddr + messageClientConfig.AdvertisedAddr = advertiseAddr + + c.MessageRouter = p2p.NewMessageRouterWithLocalClient(c.info.ID, c.config.Security, messageClientConfig) + + dsnConfig, err := c.config.Debug.CDCV2.MetaStoreConfig.GenDSN() + if err != nil { + return errors.Trace(err) + } + c.storage, err = sql.Open("mysql", dsnConfig.FormatDSN()) + if err != nil { + return errors.Trace(err) + } + captureDB, err := msql.NewCaptureObservation(c.storage, c.info) + c.captureObservation = captureDB + if err != nil { + return errors.Trace(err) + } + log.Info("capture initialized", zap.Any("capture", c.info)) + return nil +} + +func (c *captureImpl) Close() { + defer c.cancel() + // Safety: Here we mainly want to stop the owner + // and ignore it if the owner does not exist or is not set. + o, _ := c.GetOwner() + if o != nil { + o.AsyncStop() + log.Info("owner closed", zap.String("captureID", c.info.ID)) + } + + c.captureMu.Lock() + defer c.captureMu.Unlock() + + c.grpcService.Reset(nil) + if c.MessageRouter != nil { + c.MessageRouter.Close() + c.MessageRouter = nil + } + log.Info("message router closed", zap.String("captureID", c.info.ID)) +} + +// Drain does nothing for now. +func (c *captureImpl) Drain() <-chan struct{} { + done := make(chan struct{}) + close(done) + return done +} + +func (c *captureImpl) Liveness() model.Liveness { + return c.liveness +} + +func (c *captureImpl) GetOwner() (owner.Owner, error) { + c.ownerMu.Lock() + defer c.ownerMu.Unlock() + return c.owner, nil +} + +func (c *captureImpl) GetController() (controller.Controller, error) { + c.ownerMu.Lock() + defer c.ownerMu.Unlock() + if c.owner == nil { + return nil, cerror.ErrNotOwner.GenWithStackByArgs() + } + return c.controller, nil +} + +func (c *captureImpl) GetControllerCaptureInfo(ctx context.Context) (*model.CaptureInfo, error) { + panic("implement me") +} + +func (c *captureImpl) IsController() bool { + c.captureMu.Lock() + defer c.captureMu.Unlock() + return c.controller != nil +} + +func (c *captureImpl) Info() (model.CaptureInfo, error) { + c.captureMu.Lock() + defer c.captureMu.Unlock() + // when c.reset has not been called yet, c.info is nil. + if c.info != nil { + return *c.info, nil + } + return model.CaptureInfo{}, cerror.ErrCaptureNotInitialized.GenWithStackByArgs() +} + +func (c *captureImpl) StatusProvider() owner.StatusProvider { + c.ownerMu.Lock() + defer c.ownerMu.Unlock() + if c.owner == nil { + return nil + } + panic("implement me") +} + +func (c *captureImpl) WriteDebugInfo(ctx context.Context, w io.Writer) { + panic("implement me") +} + +func (c *captureImpl) GetUpstreamManager() (*upstream.Manager, error) { + if c.upstreamManager == nil { + return nil, cerror.ErrUpstreamManagerNotReady + } + return c.upstreamManager, nil +} + +func (c *captureImpl) GetEtcdClient() etcd.CDCEtcdClient { + return c.EtcdClient +} + +func (c *captureImpl) IsReady() bool { + panic("implement me") +} + +func (c *captureImpl) GetUpstreamInfo(ctx context.Context, + id model.UpstreamID, + s string, +) (*model.UpstreamInfo, error) { + panic("implement me") +} diff --git a/cdcv2/controller/controller.go b/cdcv2/controller/controller.go new file mode 100644 index 00000000000..6c3b5c10798 --- /dev/null +++ b/cdcv2/controller/controller.go @@ -0,0 +1,140 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint:unused +package controller + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/controller" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdcv2/metadata" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/upstream" + "go.uber.org/zap" +) + +var _ controller.Controller = &controllerImpl{} + +type controllerImpl struct { + captureInfo *model.CaptureInfo + captures map[model.CaptureID]*model.CaptureInfo + upstreamManager *upstream.Manager + + lastTickTime time.Time + // bootstrapped specifies whether the controller has been initialized. + // This will only be done when the controller starts the first Tick. + // NOTICE: Do not use it in a method other than tick unexpectedly, + // as it is not a thread-safe value. + bootstrapped bool + closed int32 + + controllerObservation metadata.ControllerObservation + captureObservation metadata.CaptureObservation +} + +// NewController creates a new Controller +func NewController( + upstreamManager *upstream.Manager, + captureInfo *model.CaptureInfo, + controllerObservation metadata.ControllerObservation, + captureObservation metadata.CaptureObservation, +) *controllerImpl { + return &controllerImpl{ + upstreamManager: upstreamManager, + captures: map[model.CaptureID]*model.CaptureInfo{}, + lastTickTime: time.Now(), + captureInfo: captureInfo, + controllerObservation: controllerObservation, + captureObservation: captureObservation, + } +} + +func (o *controllerImpl) Run(stdCtx context.Context) error { + tick := time.Tick(time.Second * 5) + for { + select { + case <-stdCtx.Done(): + return stdCtx.Err() + case <-tick: + changefeeds, captures, err := o.controllerObservation.ScheduleSnapshot() + if err != nil { + log.Error("failed to get snapshot", zap.Error(err)) + } + log.Info("controller snapshot", + zap.Int("changefeeds", len(changefeeds)), + zap.Int("captures", len(captures))) + + // if closed, exit the etcd worker loop + if atomic.LoadInt32(&o.closed) != 0 { + return cerror.ErrReactorFinished.GenWithStackByArgs() + } + } + } +} + +func (o *controllerImpl) Tick(ctx context.Context, + state orchestrator.ReactorState, +) (nextState orchestrator.ReactorState, err error) { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) AsyncStop() { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) GetChangefeedOwnerCaptureInfo(id model.ChangeFeedID) *model.CaptureInfo { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) GetAllChangeFeedCheckpointTs(ctx context.Context) (map[model.ChangeFeedID]uint64, error) { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error) { + // TODO implement me + panic("implement me") +} + +func (o *controllerImpl) CreateChangefeed( + ctx context.Context, + upstreamInfo *model.UpstreamInfo, + changefeedInfo *model.ChangeFeedInfo, +) error { + // TODO implement me + panic("implement me") +} diff --git a/cdcv2/metadata/model.go b/cdcv2/metadata/model.go index 10c35cfb632..2a89f2d6491 100644 --- a/cdcv2/metadata/model.go +++ b/cdcv2/metadata/model.go @@ -165,7 +165,7 @@ func (s *SchedState) fromString(str string) error { // Value implements the driver.Valuer interface. func (s SchedState) Value() (driver.Value, error) { - return s.toString(), nil + return []byte(s.toString()), nil } // Scan implements the sql.Scanner interface. diff --git a/cdcv2/metadata/sql/client.go b/cdcv2/metadata/sql/client.go index 885f7bdd8a1..d4f27a7a4c5 100644 --- a/cdcv2/metadata/sql/client.go +++ b/cdcv2/metadata/sql/client.go @@ -171,8 +171,8 @@ type progressClient[T TxnContext] interface { createProgress(tx T, pr *ProgressDO) error deleteProgress(tx T, pr *ProgressDO) error updateProgress(tx T, pr *ProgressDO) error - queryProgresss(tx T) ([]*ProgressDO, error) - queryProgresssByUpdateAt(tx T, lastUpdateAt time.Time) ([]*ProgressDO, error) + queryProgresses(tx T) ([]*ProgressDO, error) + queryProgressesByUpdateAt(tx T, lastUpdateAt time.Time) ([]*ProgressDO, error) queryProgressByCaptureID(tx T, id string) (*ProgressDO, error) queryProgressByCaptureIDsWithLock(tx T, ids []string) ([]*ProgressDO, error) } diff --git a/cdcv2/metadata/sql/client_orm.go b/cdcv2/metadata/sql/client_orm.go index d8643c53242..13bc2dcb3e3 100644 --- a/cdcv2/metadata/sql/client_orm.go +++ b/cdcv2/metadata/sql/client_orm.go @@ -133,7 +133,7 @@ func (c *ormClient) queryUpstreamsByUpdateAt(tx *gorm.DB, lastUpdateAt time.Time //nolint:unused func (c *ormClient) queryUpstreamByID(tx *gorm.DB, id uint64) (*UpstreamDO, error) { up := &UpstreamDO{} - ret := tx.Where("id = ?", id).First(up) + ret := tx.Where("id = ?", id).Limit(1).Find(up) if err := handleSingleOpErr(ret, 1, "QueryUpstreamsByUpdateAt"); err != nil { return nil, errors.Trace(err) } @@ -224,7 +224,7 @@ func (c *ormClient) queryChangefeedInfosByUpdateAt(tx *gorm.DB, lastUpdateAt tim // nolint:unused func (c *ormClient) queryChangefeedInfosByUUIDs(tx *gorm.DB, uuids ...uint64) ([]*ChangefeedInfoDO, error) { infos := []*ChangefeedInfoDO{} - ret := tx.Where("uuid in (?)", uuids).Find(&infos) + ret := tx.Where("uuid IN (?)", uuids).Find(&infos) if err := handleSingleOpErr(ret, int64(len(uuids)), "QueryChangefeedInfosByUUIDs"); err != nil { // TODO: optimize the behavior when some uuids are not found. return infos, errors.Trace(err) @@ -237,7 +237,7 @@ func (c *ormClient) queryChangefeedInfosByUUIDs(tx *gorm.DB, uuids ...uint64) ([ //nolint:unused func (c *ormClient) queryChangefeedInfoByUUID(tx *gorm.DB, uuid uint64) (*ChangefeedInfoDO, error) { info := &ChangefeedInfoDO{} - ret := tx.Where("uuid = ?", uuid).First(info) + ret := tx.Where("uuid = ?", uuid).Limit(1).Find(info) // TODO(CharlesCheung): handle record not found error. if err := handleSingleOpErr(ret, 1, "QueryChangefeedInfoByUUID"); err != nil { @@ -317,7 +317,7 @@ func (c *ormClient) queryChangefeedStatesByUpdateAt(tx *gorm.DB, lastUpdateAt ti //nolint:unused func (c *ormClient) queryChangefeedStateByUUID(tx *gorm.DB, uuid uint64) (*ChangefeedStateDO, error) { state := &ChangefeedStateDO{} - ret := tx.Where("changefeed_uuid = ?", uuid).First(state) + ret := tx.Where("changefeed_uuid = ?", uuid).Limit(1).Find(state) if err := handleSingleOpErr(ret, 1, "QueryChangefeedStateByUUID"); err != nil { return nil, errors.Trace(err) } @@ -344,7 +344,7 @@ func (c *ormClient) queryChangefeedStateByUUIDWithLock(tx *gorm.DB, uuid uint64) Clauses(clause.Locking{ Strength: "SHARE", Table: clause.Table{Name: clause.CurrentTable}, - }).First(state) + }).Limit(1).Find(state) if err := handleSingleOpErr(ret, 1, "QueryChangefeedStateByUUIDWithLock"); err != nil { return nil, errors.Trace(err) } @@ -462,7 +462,7 @@ func (c *ormClient) querySchedulesByOwnerIDAndUpdateAt(tx *gorm.DB, captureID st //nolint:unused func (c *ormClient) queryScheduleByUUID(tx *gorm.DB, uuid uint64) (*ScheduleDO, error) { schedule := &ScheduleDO{} - ret := tx.Where("changefeed_uuid = ?", uuid).First(schedule) + ret := tx.Where("changefeed_uuid = ?", uuid).Limit(1).Find(schedule) if err := handleSingleOpErr(ret, 1, "QueryScheduleByUUID"); err != nil { return nil, errors.Trace(err) } @@ -513,25 +513,25 @@ func (c *ormClient) updateProgress(tx *gorm.DB, pr *ProgressDO) error { return nil } -// queryProgresss implements the progressClient interface. +// queryProgresses implements the progressClient interface. // //nolint:unused -func (c *ormClient) queryProgresss(tx *gorm.DB) ([]*ProgressDO, error) { +func (c *ormClient) queryProgresses(tx *gorm.DB) ([]*ProgressDO, error) { progresses := []*ProgressDO{} ret := tx.Find(&progresses) - if err := handleSingleOpErr(ret, -1, "QueryProgresss"); err != nil { + if err := handleSingleOpErr(ret, -1, "queryProgresses"); err != nil { return nil, errors.Trace(err) } return progresses, nil } -// queryProgresssByUpdateAt implements the progressClient interface. +// queryProgressesByUpdateAt implements the progressClient interface. // //nolint:unused -func (c *ormClient) queryProgresssByUpdateAt(tx *gorm.DB, lastUpdateAt time.Time) ([]*ProgressDO, error) { +func (c *ormClient) queryProgressesByUpdateAt(tx *gorm.DB, lastUpdateAt time.Time) ([]*ProgressDO, error) { progresses := []*ProgressDO{} ret := tx.Where("update_at > ?", lastUpdateAt).Find(&progresses) - if err := handleSingleOpErr(ret, -1, "QueryProgresssByUpdateAt"); err != nil { + if err := handleSingleOpErr(ret, -1, "queryProgressesByUpdateAt"); err != nil { return nil, errors.Trace(err) } return progresses, nil @@ -542,7 +542,7 @@ func (c *ormClient) queryProgresssByUpdateAt(tx *gorm.DB, lastUpdateAt time.Time //nolint:unused func (c *ormClient) queryProgressByCaptureID(tx *gorm.DB, id string) (*ProgressDO, error) { progress := &ProgressDO{} - ret := tx.Where("capture_id = ?", id).First(progress) + ret := tx.Where("capture_id = ?", id).Limit(1).Find(progress) if err := handleSingleOpErr(ret, 1, "QueryProgressByCaptureID"); err != nil { return nil, errors.Trace(err) } diff --git a/cdcv2/metadata/sql/client_test.go b/cdcv2/metadata/sql/client_test.go index 02fde5fd8f6..0812c1fe994 100644 --- a/cdcv2/metadata/sql/client_test.go +++ b/cdcv2/metadata/sql/client_test.go @@ -396,6 +396,44 @@ func TestProgressClientExecSQL(t *testing.T) { // ================================ Test Query ================================= +type queryType int32 + +const ( + queryTypePoint queryType = iota + queryTypeRange + queryTypeFullTable +) + +func runMockQueryTest( + _ *testing.T, mock sqlmock.Sqlmock, + expectedSQL string, args []driver.Value, + columns []string, rows []interface{}, + getValue func(interface{}) []driver.Value, + runQuery func(expectedRowsCnt int, expectedError error), + queryTpye queryType, +) { + // Test normal execution + returnRows := sqlmock.NewRows(columns) + for _, row := range rows { + returnRows.AddRow(getValue(row)...) + } + mock.ExpectQuery(expectedSQL).WithArgs(args...).WillReturnRows(returnRows) + runQuery(len(rows), nil) + + // Test return empty rows + mock.ExpectQuery(expectedSQL).WithArgs(args...).WillReturnRows(sqlmock.NewRows(columns)) + if queryTpye == queryTypePoint { + runQuery(0, errors.ErrMetaRowsAffectedNotMatch) + } else { + runQuery(0, nil) + } + + // Test return error + testErr := errors.New("test error") + mock.ExpectQuery(expectedSQL).WithArgs(args...).WillReturnError(testErr) + runQuery(0, testErr) +} + func TestUpstreamClientQuerySQL(t *testing.T) { t.Parallel() @@ -403,17 +441,745 @@ func TestUpstreamClientQuerySQL(t *testing.T) { defer backendDB.Close() client := NewORMClient("test-upstream-client-query", db) + rows := []*UpstreamDO{ + { + ID: 1, + Endpoints: strings.Join([]string{"endpoint1", "endpoint2"}, ","), + Config: nil, /* test nil */ + Version: 1, + UpdateAt: time.Now(), + }, + { + ID: 2, + Endpoints: strings.Join([]string{"endpoint3", "endpoint4"}, ","), + Config: &security.Credential{}, /* test empty */ + Version: 2, + UpdateAt: time.Now(), + }, + } + // Test queryUpstreams - expectedSQL := "SELECT * FROM `upstream`" - mock.ExpectQuery(expectedSQL).WillReturnRows( - sqlmock.NewRows([]string{"id", "endpoints", "config", "version", "update_at"}). - AddRow(1, []byte("endpoint1,endpoint2"), nil, 1, time.Now()), + expectedQueryUpstreams := rows + queryUpstreamsRows := []interface{}{expectedQueryUpstreams[0], expectedQueryUpstreams[1]} + runMockQueryTest(t, mock, + "SELECT * FROM `upstream`", nil, + []string{"id", "endpoints", "config", "version", "update_at"}, + queryUpstreamsRows, + func(r interface{}) []driver.Value { + row, ok := r.(*UpstreamDO) + require.True(t, ok) + return []driver.Value{row.ID, row.Endpoints, row.Config, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + upstreams, err := client.queryUpstreams(db) + require.ErrorIs(t, err, expectedError) + require.Len(t, upstreams, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryUpstreams, upstreams) + } + }, + queryTypeFullTable, + ) + + // Test queryUpstreamsByUpdateAt + expectedQueryUpstreamsByUpdateAt := rows + queryUpstreamsByUpdateAtRows := []interface{}{expectedQueryUpstreamsByUpdateAt[0], expectedQueryUpstreamsByUpdateAt[1]} + queryAt := time.Now() + runMockQueryTest(t, mock, + "SELECT * FROM `upstream` WHERE update_at > ?", []driver.Value{queryAt}, + []string{"id", "endpoints", "config", "version", "update_at"}, + queryUpstreamsByUpdateAtRows, + func(r interface{}) []driver.Value { + row, ok := r.(*UpstreamDO) + require.True(t, ok) + return []driver.Value{row.ID, row.Endpoints, row.Config, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + upstreams, err := client.queryUpstreamsByUpdateAt(db, queryAt) + require.ErrorIs(t, err, expectedError) + require.Len(t, upstreams, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryUpstreamsByUpdateAt, upstreams) + } + }, + queryTypeRange, + ) + + // Test queryUpstreamByID + for _, row := range rows { + expectedQueryUpstreamByID := row + queryUpstreamByIDRows := []interface{}{row} + runMockQueryTest(t, mock, + "SELECT * FROM `upstream` WHERE id = ? LIMIT 1", + []driver.Value{expectedQueryUpstreamByID.ID}, + []string{"id", "endpoints", "config", "version", "update_at"}, + queryUpstreamByIDRows, + func(r interface{}) []driver.Value { + row, ok := r.(*UpstreamDO) + require.True(t, ok) + return []driver.Value{row.ID, row.Endpoints, row.Config, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + upstream, err := client.queryUpstreamByID(db, expectedQueryUpstreamByID.ID) + require.ErrorIs(t, err, expectedError) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryUpstreamByID, upstream) + } else { + require.Nil(t, upstream) + } + }, + queryTypePoint, + ) + } +} + +func TestChangefeedInfoClientQuerySQL(t *testing.T) { + t.Parallel() + + backendDB, db, mock := newMockDB(t) + defer backendDB.Close() + client := NewORMClient("test-changefeed-info-client-query", db) + + rows := []*ChangefeedInfoDO{ + { + ChangefeedInfo: metadata.ChangefeedInfo{ + ChangefeedIdent: metadata.ChangefeedIdent{ + UUID: 1, + Namespace: "namespace", + ID: "id", + }, + UpstreamID: 1, + SinkURI: "sinkURI", + StartTs: 1, + TargetTs: 1, + Config: nil, /* test nil */ + }, + RemovedAt: nil, /* test nil */ + Version: 1, + UpdateAt: time.Now(), + }, + { + ChangefeedInfo: metadata.ChangefeedInfo{ + ChangefeedIdent: metadata.ChangefeedIdent{ + UUID: 2, + Namespace: "namespace", + ID: "id", + }, + UpstreamID: 2, + SinkURI: "sinkURI", + StartTs: 2, + TargetTs: 2, + Config: &config.ReplicaConfig{}, /* test empty */ + }, + RemovedAt: &time.Time{}, /* test empty */ + Version: 2, + UpdateAt: time.Now(), + }, + } + + // Test queryChangefeedInfos + expectedQueryChangefeedInfos := rows + queryChangefeedInfosRows := []interface{}{expectedQueryChangefeedInfos[0], expectedQueryChangefeedInfos[1]} + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_info`", nil, + []string{ + "uuid", "namespace", "id", "upstream_id", "sink_uri", + "start_ts", "target_ts", "config", "removed_at", + "version", "update_at", + }, + queryChangefeedInfosRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedInfoDO) + require.True(t, ok) + return []driver.Value{ + row.UUID, row.Namespace, row.ID, row.UpstreamID, row.SinkURI, + row.StartTs, row.TargetTs, row.Config, row.RemovedAt, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + changefeedInfos, err := client.queryChangefeedInfos(db) + require.ErrorIs(t, err, expectedError) + require.Len(t, changefeedInfos, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedInfos, changefeedInfos) + } + }, + queryTypeFullTable, + ) + + // Test queryChangefeedInfosByUpdateAt + expectedQueryChangefeedInfosByUpdateAt := rows + queryChangefeedInfosByUpdateAtRows := []interface{}{ + expectedQueryChangefeedInfosByUpdateAt[0], + expectedQueryChangefeedInfosByUpdateAt[1], + } + queryAt := time.Now() + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_info` WHERE update_at > ?", []driver.Value{queryAt}, + []string{ + "uuid", "namespace", "id", "upstream_id", "sink_uri", + "start_ts", "target_ts", "config", "removed_at", + "version", "update_at", + }, + queryChangefeedInfosByUpdateAtRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedInfoDO) + require.True(t, ok) + return []driver.Value{ + row.UUID, row.Namespace, row.ID, row.UpstreamID, row.SinkURI, + row.StartTs, row.TargetTs, row.Config, row.RemovedAt, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + changefeedInfos, err := client.queryChangefeedInfosByUpdateAt(db, queryAt) + require.ErrorIs(t, err, expectedError) + require.Len(t, changefeedInfos, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedInfosByUpdateAt, changefeedInfos) + } + }, + queryTypeRange, + ) + + // Test queryChangefeedInfosByUUIDs + expectedQueryChangefeedInfosByUUIDs := rows + queryChangefeedInfosByUUIDsRows := []interface{}{ + expectedQueryChangefeedInfosByUUIDs[0], + expectedQueryChangefeedInfosByUUIDs[1], + } + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_info` WHERE uuid IN (?,?)", []driver.Value{1, 2}, + []string{ + "uuid", "namespace", "id", "upstream_id", "sink_uri", + "start_ts", "target_ts", "config", "removed_at", + "version", "update_at", + }, + queryChangefeedInfosByUUIDsRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedInfoDO) + require.True(t, ok) + return []driver.Value{ + row.UUID, row.Namespace, row.ID, row.UpstreamID, row.SinkURI, + row.StartTs, row.TargetTs, row.Config, row.RemovedAt, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + changefeedInfos, err := client.queryChangefeedInfosByUUIDs(db, 1, 2) + require.ErrorIs(t, err, expectedError) + require.Len(t, changefeedInfos, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedInfosByUUIDs, changefeedInfos) + } + }, + queryTypePoint, + ) + + // Test queryChangefeedInfoByUUID + for _, row := range rows { + expectedQueryChangefeedInfoByUUID := row + queryChangefeedInfoByUUIDRows := []interface{}{row} + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_info` WHERE uuid = ? LIMIT 1", + []driver.Value{expectedQueryChangefeedInfoByUUID.UUID}, + []string{ + "uuid", "namespace", "id", "upstream_id", "sink_uri", + "start_ts", "target_ts", "config", "removed_at", + "version", "update_at", + }, + queryChangefeedInfoByUUIDRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedInfoDO) + require.True(t, ok) + return []driver.Value{ + row.UUID, row.Namespace, row.ID, row.UpstreamID, row.SinkURI, + row.StartTs, row.TargetTs, row.Config, row.RemovedAt, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + changefeedInfo, err := client.queryChangefeedInfoByUUID(db, expectedQueryChangefeedInfoByUUID.UUID) + require.ErrorIs(t, err, expectedError) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedInfoByUUID, changefeedInfo) + } else { + require.Nil(t, changefeedInfo) + } + }, + queryTypePoint, + ) + } +} + +func TestChangefeedStateClientQuerySQL(t *testing.T) { + t.Parallel() + + backendDB, db, mock := newMockDB(t) + defer backendDB.Close() + client := NewORMClient("test-changefeed-state-client-query", db) + + rows := []*ChangefeedStateDO{ + { + ChangefeedState: metadata.ChangefeedState{ + ChangefeedUUID: 1, + State: "state", + // Note that warning and error could be nil. + Warning: nil, /* test nil */ + Error: &model.RunningError{}, /* test empty*/ + }, + Version: 1, + UpdateAt: time.Now(), + }, + { + ChangefeedState: metadata.ChangefeedState{ + ChangefeedUUID: 2, + State: "state", + Warning: &model.RunningError{ + // ref: TestRunningErrorScan + // Time: time.Now(), + Addr: "addr", + Code: "warn", + }, + Error: &model.RunningError{ + // Time: time.Now(), + Addr: "addr", + Code: "error", + }, + }, + Version: 2, + UpdateAt: time.Now(), + }, + } + + // Test queryChangefeedStates + expectedQueryChangefeedStates := rows + queryChangefeedStatesRows := []interface{}{expectedQueryChangefeedStates[0], expectedQueryChangefeedStates[1]} + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_state`", nil, + []string{"changefeed_uuid", "state", "warning", "error", "version", "update_at"}, + queryChangefeedStatesRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedStateDO) + require.True(t, ok) + return []driver.Value{row.ChangefeedUUID, row.State, row.Warning, row.Error, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + changefeedStates, err := client.queryChangefeedStates(db) + require.ErrorIs(t, err, expectedError) + require.Len(t, changefeedStates, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedStates, changefeedStates) + } + }, + queryTypeFullTable, + ) + + // Test queryChangefeedStatesByUpdateAt + expectedQueryChangefeedStatesByUpdateAt := rows + queryChangefeedStatesByUpdateAtRows := []interface{}{ + expectedQueryChangefeedStatesByUpdateAt[0], + expectedQueryChangefeedStatesByUpdateAt[1], + } + queryAt := time.Now() + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_state` WHERE update_at > ?", []driver.Value{queryAt}, + []string{"changefeed_uuid", "state", "warning", "error", "version", "update_at"}, + queryChangefeedStatesByUpdateAtRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedStateDO) + require.True(t, ok) + return []driver.Value{row.ChangefeedUUID, row.State, row.Warning, row.Error, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + changefeedStates, err := client.queryChangefeedStatesByUpdateAt(db, queryAt) + require.ErrorIs(t, err, expectedError) + require.Len(t, changefeedStates, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedStatesByUpdateAt, changefeedStates) + } + }, + queryTypeRange, + ) + + // Test queryChangefeedStateByUUID + for _, row := range rows { + expectedQueryChangefeedStateByUUID := row + queryChangefeedStateByUUIDRows := []interface{}{row} + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_state` WHERE changefeed_uuid = ? LIMIT 1", + []driver.Value{expectedQueryChangefeedStateByUUID.ChangefeedUUID}, + []string{"changefeed_uuid", "state", "warning", "error", "version", "update_at"}, + queryChangefeedStateByUUIDRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedStateDO) + require.True(t, ok) + return []driver.Value{row.ChangefeedUUID, row.State, row.Warning, row.Error, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + changefeedState, err := client.queryChangefeedStateByUUID(db, expectedQueryChangefeedStateByUUID.ChangefeedUUID) + require.ErrorIs(t, err, expectedError) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedStateByUUID, changefeedState) + } else { + require.Nil(t, changefeedState) + } + }, + queryTypePoint, + ) + } + + // Test queryChangefeedStateByUUIDWithLock + for _, row := range rows { + expectedQueryChangefeedStateByUUIDWithLock := row + queryChangefeedStateByUUIDWithLockRows := []interface{}{row} + runMockQueryTest(t, mock, + "SELECT * FROM `changefeed_state` WHERE changefeed_uuid = ? LIMIT 1 LOCK IN SHARE MODE", + []driver.Value{expectedQueryChangefeedStateByUUIDWithLock.ChangefeedUUID}, + []string{"changefeed_uuid", "state", "warning", "error", "version", "update_at"}, + queryChangefeedStateByUUIDWithLockRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ChangefeedStateDO) + require.True(t, ok) + return []driver.Value{row.ChangefeedUUID, row.State, row.Warning, row.Error, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + changefeedState, err := client.queryChangefeedStateByUUIDWithLock(db, expectedQueryChangefeedStateByUUIDWithLock.ChangefeedUUID) + require.ErrorIs(t, err, expectedError) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryChangefeedStateByUUIDWithLock, changefeedState) + } else { + require.Nil(t, changefeedState) + } + }, + queryTypePoint, + ) + } +} + +func TestScheduleClientQuerySQL(t *testing.T) { + t.Parallel() + + backendDB, db, mock := newMockDB(t) + defer backendDB.Close() + client := NewORMClient("test-schedule-client-query", db) + + ownerCapture := "test-schedule-client-query" + rows := []*ScheduleDO{ + { + ScheduledChangefeed: metadata.ScheduledChangefeed{ + ChangefeedUUID: 1, + Owner: nil, /* test nil */ + OwnerState: metadata.SchedRemoved, + Processors: nil, /* test nil */ + TaskPosition: metadata.ChangefeedProgress{ + CheckpointTs: 1, + }, + }, + Version: 1, + UpdateAt: time.Now(), + }, + { + ScheduledChangefeed: metadata.ScheduledChangefeed{ + ChangefeedUUID: 2, + Owner: &ownerCapture, + OwnerState: metadata.SchedRemoved, + Processors: &ownerCapture, + TaskPosition: metadata.ChangefeedProgress{ + CheckpointTs: 2, + }, + }, + Version: 2, + UpdateAt: time.Now(), + }, + } + + // Test querySchedules + expectedQuerySchedules := rows + querySchedulesRows := []interface{}{expectedQuerySchedules[0], expectedQuerySchedules[1]} + runMockQueryTest(t, mock, + "SELECT * FROM `schedule`", nil, + []string{ + "changefeed_uuid", "owner", "owner_state", "processors", "task_position", + "version", "update_at", + }, + querySchedulesRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ScheduleDO) + require.True(t, ok) + return []driver.Value{ + row.ChangefeedUUID, row.Owner, row.OwnerState, row.Processors, row.TaskPosition, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + schedules, err := client.querySchedules(db) + require.ErrorIs(t, err, expectedError) + require.Len(t, schedules, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQuerySchedules, schedules) + } + }, + queryTypeFullTable, + ) + + // Test querySchedulesByUpdateAt + expectedQuerySchedulesByUpdateAt := rows + querySchedulesByUpdateAtRows := []interface{}{ + expectedQuerySchedulesByUpdateAt[0], + expectedQuerySchedulesByUpdateAt[1], + } + queryAt := time.Now() + runMockQueryTest(t, mock, + "SELECT * FROM `schedule` WHERE update_at > ?", []driver.Value{queryAt}, + []string{ + "changefeed_uuid", "owner", "owner_state", "processors", "task_position", + "version", "update_at", + }, + querySchedulesByUpdateAtRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ScheduleDO) + require.True(t, ok) + return []driver.Value{ + row.ChangefeedUUID, row.Owner, row.OwnerState, row.Processors, row.TaskPosition, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + schedules, err := client.querySchedulesByUpdateAt(db, queryAt) + require.ErrorIs(t, err, expectedError) + require.Len(t, schedules, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQuerySchedulesByUpdateAt, schedules) + } + }, + queryTypeRange, + ) + + // Test querySchedulesByOwnerIDAndUpdateAt + expectedQuerySchedulesByOwnerIDAndUpdateAt := rows + querySchedulesByOwnerIDAndUpdateAtRows := []interface{}{ + expectedQuerySchedulesByOwnerIDAndUpdateAt[0], + expectedQuerySchedulesByOwnerIDAndUpdateAt[1], + } + queryAt = time.Now() + runMockQueryTest(t, mock, + "SELECT * FROM `schedule` WHERE owner = ? and update_at > ?", []driver.Value{ownerCapture, queryAt}, + []string{ + "changefeed_uuid", "owner", "owner_state", "processors", "task_position", + "version", "update_at", + }, + querySchedulesByOwnerIDAndUpdateAtRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ScheduleDO) + require.True(t, ok) + return []driver.Value{ + row.ChangefeedUUID, row.Owner, row.OwnerState, row.Processors, row.TaskPosition, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + schedules, err := client.querySchedulesByOwnerIDAndUpdateAt(db, ownerCapture, queryAt) + require.ErrorIs(t, err, expectedError) + require.Len(t, schedules, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQuerySchedulesByOwnerIDAndUpdateAt, schedules) + } + }, + queryTypeRange, + ) + + // Test queryScheduleByUUID + for _, row := range rows { + expectedQueryScheduleByUUID := row + queryScheduleByUUIDRows := []interface{}{row} + runMockQueryTest(t, mock, + "SELECT * FROM `schedule` WHERE changefeed_uuid = ? LIMIT 1", + []driver.Value{expectedQueryScheduleByUUID.ChangefeedUUID}, + []string{ + "changefeed_uuid", "owner", "owner_state", "processors", "task_position", + "version", "update_at", + }, + queryScheduleByUUIDRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ScheduleDO) + require.True(t, ok) + return []driver.Value{ + row.ChangefeedUUID, row.Owner, row.OwnerState, row.Processors, row.TaskPosition, + row.Version, row.UpdateAt, + } + }, + func(expectedRowsCnt int, expectedError error) { + schedule, err := client.queryScheduleByUUID(db, expectedQueryScheduleByUUID.ChangefeedUUID) + require.ErrorIs(t, err, expectedError) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQueryScheduleByUUID, schedule) + } else { + require.Nil(t, schedule) + } + }, + queryTypePoint, + ) + } + + // Test querySchedulesUinqueOwnerIDs + expectedQuerySchedulesUinqueOwnerIDs := []string{"owner1", "owner2"} + querySchedulesUinqueOwnerIDsRows := []interface{}{ + expectedQuerySchedulesUinqueOwnerIDs[0], + expectedQuerySchedulesUinqueOwnerIDs[1], + } + runMockQueryTest(t, mock, + "SELECT DISTINCT `owner` FROM `schedule` WHERE owner IS NOT NULL", nil, + []string{"owner"}, + querySchedulesUinqueOwnerIDsRows, + func(r interface{}) []driver.Value { + row, ok := r.(string) + require.True(t, ok) + return []driver.Value{row} + }, + func(expectedRowsCnt int, expectedError error) { + ownerIDs, err := client.querySchedulesUinqueOwnerIDs(db) + require.ErrorIs(t, err, expectedError) + require.Len(t, ownerIDs, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedQuerySchedulesUinqueOwnerIDs, ownerIDs) + } + }, + queryTypeFullTable, + ) +} + +func TestProgressClientQuerySQL(t *testing.T) { + t.Parallel() + + backendDB, db, mock := newMockDB(t) + defer backendDB.Close() + client := NewORMClient("test-progress-client-query", db) + + rows := []*ProgressDO{ + { + CaptureID: "captureID-1", + Progress: &metadata.CaptureProgress{ + 1: { + CheckpointTs: 1, + MinTableBarrierTs: 1, + }, + 2: { + CheckpointTs: 2, + MinTableBarrierTs: 2, + }, + }, + Version: 1, + UpdateAt: time.Now(), + }, + { + CaptureID: "captureID-2", + Progress: &metadata.CaptureProgress{}, + Version: 2, + UpdateAt: time.Now(), + }, + } + + // Test queryProgresses + expectedqueryProgresses := rows + queryProgressesRows := []interface{}{expectedqueryProgresses[0], expectedqueryProgresses[1]} + runMockQueryTest(t, mock, + "SELECT * FROM `progress`", nil, + []string{"capture_id", "progress", "version", "update_at"}, + queryProgressesRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ProgressDO) + require.True(t, ok) + return []driver.Value{row.CaptureID, row.Progress, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + progresses, err := client.queryProgresses(db) + require.ErrorIs(t, err, expectedError) + require.Len(t, progresses, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedqueryProgresses, progresses) + } + }, + queryTypeFullTable, + ) + + // Test queryProgressesByUpdateAt + expectedqueryProgressesByUpdateAt := rows + queryProgressesByUpdateAtRows := []interface{}{ + expectedqueryProgressesByUpdateAt[0], + expectedqueryProgressesByUpdateAt[1], + } + queryAt := time.Now() + runMockQueryTest(t, mock, + "SELECT * FROM `progress` WHERE update_at > ?", []driver.Value{queryAt}, + []string{"capture_id", "progress", "version", "update_at"}, + queryProgressesByUpdateAtRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ProgressDO) + require.True(t, ok) + return []driver.Value{row.CaptureID, row.Progress, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + progresses, err := client.queryProgressesByUpdateAt(db, queryAt) + require.ErrorIs(t, err, expectedError) + require.Len(t, progresses, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedqueryProgressesByUpdateAt, progresses) + } + }, + queryTypeRange, + ) + + // Test queryProgressByCaptureID + for _, row := range rows { + expectedqueryProgressByCaptureID := row + queryProgressByCaptureIDRows := []interface{}{row} + runMockQueryTest(t, mock, + "SELECT * FROM `progress` WHERE capture_id = ? LIMIT 1", + []driver.Value{expectedqueryProgressByCaptureID.CaptureID}, + []string{"capture_id", "progress", "version", "update_at"}, + queryProgressByCaptureIDRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ProgressDO) + require.True(t, ok) + return []driver.Value{row.CaptureID, row.Progress, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + progress, err := client.queryProgressByCaptureID(db, expectedqueryProgressByCaptureID.CaptureID) + require.ErrorIs(t, err, expectedError) + if expectedRowsCnt != 0 { + require.Equal(t, expectedqueryProgressByCaptureID, progress) + } else { + require.Nil(t, progress) + } + }, + queryTypePoint, + ) + } + + // Test queryProgressByCaptureIDsWithLock + expectedqueryProgressByCaptureIDsWithLock := rows + queryProgressByCaptureIDsWithLockRows := []interface{}{rows[0], rows[1]} + captureIDs := []string{expectedqueryProgressByCaptureIDsWithLock[0].CaptureID, expectedqueryProgressByCaptureIDsWithLock[1].CaptureID} + runMockQueryTest(t, mock, + "SELECT * FROM `progress` WHERE capture_id in (?,?) LOCK IN SHARE MODE", + []driver.Value{expectedqueryProgressByCaptureIDsWithLock[0].CaptureID, expectedqueryProgressByCaptureIDsWithLock[1].CaptureID}, + []string{"capture_id", "progress", "version", "update_at"}, + queryProgressByCaptureIDsWithLockRows, + func(r interface{}) []driver.Value { + row, ok := r.(*ProgressDO) + require.True(t, ok) + return []driver.Value{row.CaptureID, row.Progress, row.Version, row.UpdateAt} + }, + func(expectedRowsCnt int, expectedError error) { + progress, err := client.queryProgressByCaptureIDsWithLock(db, captureIDs) + require.ErrorIs(t, err, expectedError) + require.Len(t, progress, expectedRowsCnt) + if expectedRowsCnt != 0 { + require.Equal(t, expectedqueryProgressByCaptureIDsWithLock, progress) + } + }, + queryTypeRange, ) - upstreams, err := client.queryUpstreams(db) - require.NoError(t, err) - require.Len(t, upstreams, 1) - require.Equal(t, uint64(1), upstreams[0].ID) - require.Equal(t, "endpoint1,endpoint2", upstreams[0].Endpoints) - require.Nil(t, upstreams[0].Config) - require.Equal(t, uint64(1), upstreams[0].Version) } diff --git a/cdcv2/metadata/sql/observation.go b/cdcv2/metadata/sql/observation.go index e1cdba99fb3..ac049976129 100644 --- a/cdcv2/metadata/sql/observation.go +++ b/cdcv2/metadata/sql/observation.go @@ -278,7 +278,7 @@ func (c *CaptureOb[T]) GetChangefeedProgress( var prDOs []*ProgressDO var scDOs []*ScheduleDO err = c.client.Txn(c.egCtx, func(tx T) error { - prDOs, err = c.client.queryProgresss(tx) + prDOs, err = c.client.queryProgresses(tx) if err != nil { return err } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 7e67a43cf0c..64413bb4a9b 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -199,6 +199,10 @@ func TestParseCfg(t *testing.T) { CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -338,6 +342,10 @@ check-balance-interval = "10s" CheckBalanceInterval: config.TomlDuration(10 * time.Second), AddTableBatchSize: 50, }, + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -469,6 +477,10 @@ cert-allowed-cn = ["dd","ee"] CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -527,5 +539,9 @@ unknown3 = 3 CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, }, o.serverConfig.Debug) } diff --git a/pkg/config/cdc_v2.go b/pkg/config/cdc_v2.go new file mode 100644 index 00000000000..1869e45d4f0 --- /dev/null +++ b/pkg/config/cdc_v2.go @@ -0,0 +1,133 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "net" + "net/url" + + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/security" +) + +// CDCV2 represents config for ticdc v2 +type CDCV2 struct { + // Enable represents if the cdc v2 is enabled or not + Enable bool `toml:"enable" json:"enable"` + // MetaStoreConfig represents config for new meta store configurations + MetaStoreConfig MetaStoreConfiguration `toml:"meta-store" json:"meta-store"` +} + +// MetaStoreConfiguration represents config for new meta store configurations +type MetaStoreConfiguration struct { + // URI is the address of the meta store. + // for example: "mysql://127.0.0.1:3306/test" + URI string `toml:"uri" json:"uri"` + // SSLCA is the path of the CA certificate file. + SSLCa string `toml:"ssl-ca" json:"ssl-ca"` + SSLCert string `toml:"ssl-cert" json:"ssl-cert"` + SSLKey string `toml:"ssl-key" json:"ssl-key"` +} + +// ValidateAndAdjust validates the meta store configurations +func (c *CDCV2) ValidateAndAdjust() error { + if !c.Enable { + return nil + } + if c.MetaStoreConfig.URI == "" { + return errors.New("missing meta store uri configuration") + } + parsedURI, err := url.Parse(c.MetaStoreConfig.URI) + if err != nil { + return errors.Trace(err) + } + if !isSupportedScheme(parsedURI.Scheme) { + return errors.Errorf("the %s scheme is not supported by meta store", parsedURI.Scheme) + } + return nil +} + +// GenDSN generates a DSN from the given metastore config. +func (cfg *MetaStoreConfiguration) GenDSN() (*dmysql.Config, error) { + endpoint, err := url.Parse(cfg.URI) + if err != nil { + return nil, errors.Trace(err) + } + tls, err := cfg.getSSLParam() + if err != nil { + return nil, errors.Trace(err) + } + username := endpoint.User.Username() + if username == "" { + username = "root" + } + password, _ := endpoint.User.Password() + + hostName := endpoint.Hostname() + port := endpoint.Port() + if port == "" { + port = "3306" + } + + // This will handle the IPv6 address format. + var dsn *dmysql.Config + host := net.JoinHostPort(hostName, port) + // dsn format of the driver: + // [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] + dsnStr := fmt.Sprintf("%s:%s@tcp(%s)%s%s", username, password, host, endpoint.Path, tls) + if dsn, err = dmysql.ParseDSN(dsnStr); err != nil { + return nil, errors.Trace(err) + } + + // create test db used for parameter detection + // Refer https://github.com/go-sql-driver/mysql#parameters + if dsn.Params == nil { + dsn.Params = make(map[string]string) + } + // enable parseTime for time.Time type + dsn.Params["parseTime"] = "true" + for key, pa := range endpoint.Query() { + dsn.Params[key] = pa[0] + } + return dsn, nil +} + +func (cfg *MetaStoreConfiguration) getSSLParam() (string, error) { + if len(cfg.SSLCa) == 0 || len(cfg.SSLCert) == 0 || len(cfg.SSLKey) == 0 { + return "", nil + } + credential := security.Credential{ + CAPath: cfg.SSLCa, + CertPath: cfg.SSLCert, + KeyPath: cfg.SSLKey, + } + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + return "", errors.Trace(err) + } + name := "cdc_mysql_tls_meta_store" + err = dmysql.RegisterTLSConfig(name, tlsCfg) + if err != nil { + return "", cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") + } + return "?tls=" + name, nil +} + +// isSupportedScheme returns true if the scheme is compatible with MySQL. +func isSupportedScheme(scheme string) bool { + return scheme == "mysql" +} diff --git a/pkg/config/cdc_v2_test.go b/pkg/config/cdc_v2_test.go new file mode 100644 index 00000000000..43e148cde18 --- /dev/null +++ b/pkg/config/cdc_v2_test.go @@ -0,0 +1,55 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDefaultCDCV2Config(t *testing.T) { + defaultCDCV2 := defaultServerConfig.Debug.CDCV2 + require.NotNil(t, defaultCDCV2) + require.False(t, defaultCDCV2.Enable) +} + +func TestCDCV2ValidateAndAdjust(t *testing.T) { + cdcV2 := &CDCV2{ + Enable: false, + MetaStoreConfig: MetaStoreConfiguration{}, + } + require.Nil(t, cdcV2.ValidateAndAdjust()) + cdcV2.Enable = true + require.NotNil(t, cdcV2.ValidateAndAdjust()) + cdcV2.MetaStoreConfig.URI = "http://127.0.0.1" + require.NotNil(t, cdcV2.ValidateAndAdjust()) + cdcV2.MetaStoreConfig.URI = "mysql://127.0.0.1" + require.Nil(t, cdcV2.ValidateAndAdjust()) +} + +func TestGenDSN(t *testing.T) { + storeConfig := &MetaStoreConfiguration{ + URI: "mysql://root:abcd@127.0.0.1:4000/cdc?a=c&timeout=1m", + } + dsn, err := storeConfig.GenDSN() + require.Nil(t, err) + require.Equal(t, "root", dsn.User) + require.Equal(t, "abcd", dsn.Passwd) + require.Equal(t, "127.0.0.1:4000", dsn.Addr) + require.Equal(t, "cdc", dsn.DBName) + require.Equal(t, "true", dsn.Params["parseTime"]) + require.Equal(t, "1m", dsn.Params["timeout"]) + require.Equal(t, "c", dsn.Params["a"]) +} diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 35caa03f18d..18d803b30f6 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -157,7 +157,16 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, - "enable-kv-connect-backoff": false + "enable-kv-connect-backoff": false, + "cdc-v2": { + "enable": false, + "meta-store": { + "uri": "", + "ssl-ca": "", + "ssl-cert": "", + "ssl-key": "" + } + } }, "cluster-id": "default", "max-memory-percentage": 0, diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 3a7815090bc..d5c24cd10d2 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -28,6 +28,9 @@ type DebugConfig struct { // EnableKVConnectBackOff enables the backoff for kv connect. EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` + + // CDCV2 enables ticdc version 2 implementation with new metastore + CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"` } // ValidateAndAdjust validates and adjusts the debug configuration @@ -41,6 +44,9 @@ func (c *DebugConfig) ValidateAndAdjust() error { if err := c.Scheduler.ValidateAndAdjust(); err != nil { return errors.Trace(err) } + if err := c.CDCV2.ValidateAndAdjust(); err != nil { + return errors.Trace(err) + } return nil } diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2c8f37f6185..dcf193c94a3 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -143,7 +143,13 @@ type replicaConfig struct { // Value implements the driver.Valuer interface func (c ReplicaConfig) Value() (driver.Value, error) { - return c.Marshal() + cfg, err := c.Marshal() + if err != nil { + return nil, err + } + + // TODO: refactor the meaningless type conversion. + return []byte(cfg), nil } // Scan implements the sql.Scanner interface diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index eda9f95c8e5..bac3c1384be 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -134,6 +134,7 @@ var defaultServerConfig = &ServerConfig{ Scheduler: NewDefaultSchedulerConfig(), EnableKVConnectBackOff: false, + CDCV2: &CDCV2{Enable: false}, }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit,