From 35cb58c02dce4ba66bb0338a9b947c0b985282d1 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 23 Oct 2023 17:34:55 +0800 Subject: [PATCH 1/4] add sql config for changefeed --- cdc/api/v2/model.go | 5 ++- cdc/api/v2/model_test.go | 1 + cdc/owner/ddl_sink.go | 9 ++++- pkg/config/config_test_data.go | 9 +++-- pkg/config/replica_config.go | 4 ++ pkg/filter/expr_filter.go | 22 ++++++++--- pkg/filter/expr_filter_bench_test.go | 2 +- pkg/filter/expr_filter_test.go | 8 ++-- pkg/filter/filter.go | 4 +- pkg/filter/sql_event_filter.go | 14 ++++++- pkg/filter/sql_event_filter_test.go | 4 +- pkg/filter/utils_test.go | 59 ++++++++++++++++++++++++++++ scripts/check-diff-line-width.sh | 1 + 13 files changed, 121 insertions(+), 21 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index af9887b0240..4e5c447a414 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -193,7 +193,8 @@ type ReplicaConfig struct { Consistent *ConsistentConfig `json:"consistent,omitempty"` Scheduler *ChangefeedSchedulerConfig `json:"scheduler"` Integrity *IntegrityConfig `json:"integrity"` - ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" swaggertype:"string"` + ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"` + SQLMode string `json:"sql_mode,omitempty"` } // ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig @@ -211,6 +212,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint res.IgnoreIneligibleTable = c.IgnoreIneligibleTable + res.SQLMode = c.SQLMode if c.SyncPointInterval != nil { res.SyncPointInterval = &c.SyncPointInterval.duration } @@ -497,6 +499,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { CheckGCSafePoint: cloned.CheckGCSafePoint, EnableSyncPoint: cloned.EnableSyncPoint, BDRMode: cloned.BDRMode, + SQLMode: cloned.SQLMode, } if cloned.SyncPointInterval != nil { diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 8e3b697e48a..1d30ef67cdd 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -80,6 +80,7 @@ var defaultAPIConfig = &ReplicaConfig{ }, ChangefeedErrorStuckDuration: &JSONDuration{*config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration}, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, } func TestDefaultReplicaConfig(t *testing.T) { diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 5bdb8a8ca46..873642015a9 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/format" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" @@ -426,7 +427,13 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { // addSpecialComment translate tidb feature to comment func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { - stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate) + p := parser.New() + mode, err := mysql.GetSQLMode(s.info.Config.SQLMode) + if err != nil { + return "", errors.Trace(err) + } + p.SetSQLMode(mode) + stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate) if err != nil { return "", errors.Trace(err) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 35caa03f18d..af9dd8d23e6 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -81,7 +81,8 @@ const ( "integrity-check-level": "none", "corruption-handle-level": "warn" }, - "changefeed-error-stuck-duration": 1800000000000 + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" }` testCfgTestServerConfigMarshal = `{ @@ -308,7 +309,8 @@ const ( "integrity-check-level": "none", "corruption-handle-level": "warn" }, - "changefeed-error-stuck-duration": 1800000000000 + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" }` testCfgTestReplicaConfigMarshal2 = `{ @@ -453,6 +455,7 @@ const ( "integrity-check-level": "none", "corruption-handle-level": "warn" }, - "changefeed-error-stuck-duration": 1800000000000 + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" }` ) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2c8f37f6185..889d34aed75 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/pkg/config/outdated" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" @@ -38,6 +39,7 @@ const ( // minSyncPointRetention is the minimum of SyncPointRetention can be set. minSyncPointRetention = time.Hour * 1 minChangeFeedErrorStuckDuration = time.Minute * 30 + defaultSQLMode = mysql.DefaultSQLMode ) var defaultReplicaConfig = &ReplicaConfig{ @@ -88,6 +90,7 @@ var defaultReplicaConfig = &ReplicaConfig{ CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn, }, ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30), + SQLMode: defaultSQLMode, } // GetDefaultReplicaConfig returns the default replica config. @@ -139,6 +142,7 @@ type replicaConfig struct { // Integrity is only available when the downstream is MQ. Integrity *integrity.Config `toml:"integrity" json:"integrity"` ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` + SQLMode string `toml:"sql-mode" json:"sql-mode"` } // Value implements the driver.Valuer interface diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index 8bae5d2e913..6015dda85ad 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -14,6 +14,7 @@ package filter import ( + "fmt" "strings" "sync" @@ -21,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" @@ -77,10 +79,16 @@ func newExprFilterRule( // verifyAndInitRule will verify and init the rule. // It should only be called in dmlExprFilter's verify method. -func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo) error { +func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo, sqlMode string) error { // verify expression filter rule syntax. p := parser.New() - _, _, err := p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) + mode, err := mysql.GetSQLMode(sqlMode) + if err != nil { + log.Error("failed to get sql mode", zap.Error(err)) + return cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode)) + } + p.SetSQLMode(mode) + _, _, err = p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) if err != nil { log.Error("failed to parse expression", zap.Error(err)) return cerror.ErrExpressionParseFailed. @@ -347,14 +355,18 @@ func getColumnFromError(err error) string { // dmlExprFilter is a filter that filters DML events by SQL expression. type dmlExprFilter struct { - rules []*dmlExprFilterRule + rules []*dmlExprFilterRule + sqlMODE string } func newExprFilter( timezone string, cfg *config.FilterConfig, + sqlMODE string, ) (*dmlExprFilter, error) { - res := &dmlExprFilter{} + res := &dmlExprFilter{ + sqlMODE: sqlMODE, + } sessCtx := utils.NewSessionCtx(map[string]string{ "time_zone": timezone, }) @@ -382,7 +394,7 @@ func (f *dmlExprFilter) addRule( // verify checks if all rules in this filter is valid. func (f *dmlExprFilter) verify(tableInfos []*model.TableInfo) error { for _, rule := range f.rules { - err := rule.verify(tableInfos) + err := rule.verify(tableInfos, f.sqlMODE) if err != nil { log.Error("failed to verify expression filter rule", zap.Error(err)) return errors.Trace(err) diff --git a/pkg/filter/expr_filter_bench_test.go b/pkg/filter/expr_filter_bench_test.go index 91c3fe903cc..62263e8e2f8 100644 --- a/pkg/filter/expr_filter_bench_test.go +++ b/pkg/filter/expr_filter_bench_test.go @@ -57,7 +57,7 @@ func BenchmarkSkipDML(b *testing.B) { sessCtx := utils.NewSessionCtx(map[string]string{ "time_zone": "", }) - f, err := newExprFilter("", cfg) + f, err := newExprFilter("", cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(b, err) type innerCase struct { diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index 372ace442eb..66fe1f02aa8 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -324,7 +324,7 @@ func TestShouldSkipDMLBasic(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) @@ -441,7 +441,7 @@ func TestShouldSkipDMLError(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) @@ -634,7 +634,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { if c.updateDDl != "" { @@ -754,7 +754,7 @@ func TestVerify(t *testing.T) { ti := helper.execDDL(ddl) tableInfos = append(tableInfos, ti) } - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) err = f.verify(tableInfos) require.True(t, errors.ErrorEqual(tc.err, err), "case: %+v", tc, err) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index d5674475fb0..a964c68e6ea 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -102,11 +102,11 @@ func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) { f = tfilter.CaseInsensitive(f) } - dmlExprFilter, err := newExprFilter(tz, cfg.Filter) + dmlExprFilter, err := newExprFilter(tz, cfg.Filter, cfg.SQLMode) if err != nil { return nil, err } - sqlEventFilter, err := newSQLEventFilter(cfg.Filter) + sqlEventFilter, err := newSQLEventFilter(cfg.Filter, cfg.SQLMode) if err != nil { return nil, err } diff --git a/pkg/filter/sql_event_filter.go b/pkg/filter/sql_event_filter.go index d9e80d6233a..fdf3ab885bf 100644 --- a/pkg/filter/sql_event_filter.go +++ b/pkg/filter/sql_event_filter.go @@ -14,6 +14,7 @@ package filter import ( + "fmt" "sync" "github.com/pingcap/errors" @@ -21,6 +22,7 @@ import ( bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" tfilter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -107,9 +109,17 @@ type sqlEventFilter struct { rules []*sqlEventRule } -func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) { +func newSQLEventFilter(cfg *config.FilterConfig, sqlMode string) (*sqlEventFilter, error) { + p := parser.New() + mode, err := mysql.GetSQLMode(sqlMode) + if err != nil { + log.Error("failed to get sql mode", zap.Error(err)) + return nil, cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode)) + } + p.SetSQLMode(mode) + res := &sqlEventFilter{ - ddlParser: parser.New(), + ddlParser: p, } for _, rule := range cfg.EventFilters { if err := res.addRule(rule); err != nil { diff --git a/pkg/filter/sql_event_filter_test.go b/pkg/filter/sql_event_filter_test.go index 1fa03bedcc1..9e9707d0728 100644 --- a/pkg/filter/sql_event_filter_test.go +++ b/pkg/filter/sql_event_filter_test.go @@ -171,7 +171,7 @@ func TestShouldSkipDDL(t *testing.T) { } for _, tc := range testCases { - f, err := newSQLEventFilter(tc.cfg) + f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.True(t, errors.ErrorEqual(err, tc.err), "case: %+s", err) for _, c := range tc.cases { ddl := &model.DDLEvent{ @@ -298,7 +298,7 @@ func TestShouldSkipDML(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() - f, err := newSQLEventFilter(tc.cfg) + f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.NoError(t, err) for _, c := range tc.cases { event := &model.RowChangedEvent{ diff --git a/pkg/filter/utils_test.go b/pkg/filter/utils_test.go index 6207ce974a6..c494ae4c9bd 100644 --- a/pkg/filter/utils_test.go +++ b/pkg/filter/utils_test.go @@ -14,12 +14,14 @@ package filter import ( + "fmt" "testing" "github.com/pingcap/log" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" tifilter "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -174,3 +176,60 @@ func TestDDLToTypeSpecialDDL(t *testing.T) { require.Equal(t, c.evenType, et, "case%v", c.ddl) } } + +func TestToDDLEventWithSQLMode(t *testing.T) { + cases := []struct { + name string + query string + jobTp timodel.ActionType + sqlMode string // sql mode + expect bf.EventType + errMsg string + }{ + { + name: "create table", + query: "create table t1(id int primary key)", + jobTp: timodel.ActionCreateTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.CreateTable, + }, + { + name: "drop table", + query: "drop table t1", + jobTp: timodel.ActionDropTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.DropTable, + }, + { // "" in table name or column name are not supported when sqlMode is set to ANSI_QUOTES + name: "create table 2", + query: `create table "t1" ("id" int primary key)`, + jobTp: timodel.ActionCreateTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.CreateTable, + errMsg: "ErrConvertDDLToEventTypeFailed", + }, + { // "" in table name or column name are supported when sqlMode is set to ANSI_QUOTES + name: "create table 3", + query: `create table "t1" ("id" int primary key)`, + jobTp: timodel.ActionCreateTable, + sqlMode: fmt.Sprint(config.GetDefaultReplicaConfig().SQLMode + ",ANSI_QUOTES"), + expect: bf.CreateTable, + }, + } + for _, c := range cases { + innerCase := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + p := parser.New() + mode, err := mysql.GetSQLMode(innerCase.sqlMode) + require.NoError(t, err) + p.SetSQLMode(mode) + tp, err := ddlToEventType(p, innerCase.query, innerCase.jobTp) + if innerCase.errMsg != "" { + require.Contains(t, err.Error(), innerCase.errMsg, innerCase.name) + } else { + require.Equal(t, innerCase.expect, tp) + } + }) + } +} diff --git a/scripts/check-diff-line-width.sh b/scripts/check-diff-line-width.sh index af65760cc44..a034d0a2bf6 100755 --- a/scripts/check-diff-line-width.sh +++ b/scripts/check-diff-line-width.sh @@ -27,6 +27,7 @@ git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \ -- ':(exclude)*_gen.go' \ -- ':(exclude)*_gen_test.go' \ -- ':(exclude)*_mock.go' \ + -- ':(exclude)*_test_data.go' \ -- ':(exclude)*.pb.go' | grep -E '^\+' | grep -vE '^\+\+\+' | grep -vE 'json:' | grep -vE 'toml:' | sed 's/\t/ /g' | From 84e06ebbd6943a14bd17cab108f6516ba65a9049 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 1 Nov 2023 12:02:12 +0800 Subject: [PATCH 2/4] resolve comments --- cdc/owner/ddl_sink.go | 5 +++++ pkg/config/replica_config.go | 7 ++++++- pkg/filter/utils_test.go | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 873642015a9..f06d01e6f3a 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -428,6 +428,11 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { // addSpecialComment translate tidb feature to comment func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { p := parser.New() + // We need to use the correct SQL mode to parse the DDL query. + // Otherwise, the parser may fail to parse the DDL query. + // For example, it is needed to parse the following DDL query: + // `alter table "t" add column "c" int default 1;` + // by adding `ANSI_QUOTES` to the SQL mode. mode, err := mysql.GetSQLMode(s.info.Config.SQLMode) if err != nil { return "", errors.Trace(err) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 889d34aed75..79b08a56b57 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -39,7 +39,12 @@ const ( // minSyncPointRetention is the minimum of SyncPointRetention can be set. minSyncPointRetention = time.Hour * 1 minChangeFeedErrorStuckDuration = time.Minute * 30 - defaultSQLMode = mysql.DefaultSQLMode + // The default SQL Mode of TiDB: "ONLY_FULL_GROUP_BY, + // STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO, + // NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" + // Note: The SQL Mode of TiDB is not the same as ORACLE. + // If you want to use the same SQL Mode as ORACLE, you need to add "ORACLE" to the SQL Mode. + defaultSQLMode = mysql.DefaultSQLMode ) var defaultReplicaConfig = &ReplicaConfig{ diff --git a/pkg/filter/utils_test.go b/pkg/filter/utils_test.go index c494ae4c9bd..c3441e10877 100644 --- a/pkg/filter/utils_test.go +++ b/pkg/filter/utils_test.go @@ -178,6 +178,7 @@ func TestDDLToTypeSpecialDDL(t *testing.T) { } func TestToDDLEventWithSQLMode(t *testing.T) { + t.Parallel() cases := []struct { name string query string From c712b4bed6a913a98fc36f69d04450d71bfda33c Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 2 Nov 2023 11:17:56 +0800 Subject: [PATCH 3/4] fix error --- cdc/model/changefeed.go | 3 +++ cdc/owner/ddl_sink_test.go | 9 ++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index fec7bbed7e7..fbf9425953b 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -334,6 +334,9 @@ func (info *ChangeFeedInfo) VerifyAndComplete() { if info.Config.ChangefeedErrorStuckDuration == nil { info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration } + if info.Config.SQLMode == "" { + info.Config.SQLMode = defaultConfig.SQLMode + } info.RmUnusedFields() } diff --git a/cdc/owner/ddl_sink_test.go b/cdc/owner/ddl_sink_test.go index be3c3ee589d..eaca597a5ed 100644 --- a/cdc/owner/ddl_sink_test.go +++ b/cdc/owner/ddl_sink_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/stretchr/testify/require" @@ -61,7 +62,13 @@ func (m *mockSink) GetDDL() *model.DDLEvent { func newDDLSink4Test(reportErr func(err error), reportWarn func(err error)) (DDLSink, *mockSink) { mockSink := &mockSink{} - ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"), &model.ChangeFeedInfo{}, reportErr, reportWarn) + ddlSink := newDDLSink( + model.DefaultChangeFeedID("changefeed-test"), + &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + }, + reportErr, + reportWarn) ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx context.Context, s *ddlSinkImpl) error { s.sink = mockSink return nil From e4c3bfa792a7da589728d5f3419fbc3e304a202d Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 2 Nov 2023 14:16:57 +0800 Subject: [PATCH 4/4] fix failed ut cases --- cdc/owner/ddl_sink_test.go | 3 +++ pkg/orchestrator/reactor_state_test.go | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/cdc/owner/ddl_sink_test.go b/cdc/owner/ddl_sink_test.go index eaca597a5ed..52883f2e1c3 100644 --- a/cdc/owner/ddl_sink_test.go +++ b/cdc/owner/ddl_sink_test.go @@ -552,6 +552,9 @@ func TestAddSpecialComment(t *testing.T) { } s := &ddlSinkImpl{} + s.info = &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + } for _, ca := range testCase { re, err := s.addSpecialComment(ca.event) require.Nil(t, err) diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index fd0b29f3a7c..61dc26f4b7e 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -123,6 +123,7 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -177,6 +178,7 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -236,6 +238,7 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -330,6 +333,7 @@ func TestPatchInfo(t *testing.T) { Scheduler: defaultConfig.Scheduler, Integrity: defaultConfig.Integrity, ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, + SQLMode: defaultConfig.SQLMode, }, } cfInfo.RmUnusedFields() @@ -352,6 +356,7 @@ func TestPatchInfo(t *testing.T) { Scheduler: defaultConfig.Scheduler, Integrity: defaultConfig.Integrity, ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, + SQLMode: defaultConfig.SQLMode, }, } cfInfo.RmUnusedFields()