From ea8c990d9a9e3ef8295f3e1a8f6cc9c70074b650 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 2 Nov 2023 16:32:09 +0800 Subject: [PATCH] This is an automated cherry-pick of #9941 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 9 +++- cdc/api/v2/model_test.go | 1 + cdc/model/changefeed.go | 74 ++++++++++++++++++++++++++ cdc/owner/ddl_sink.go | 17 +++++- cdc/owner/ddl_sink_test.go | 12 ++++- pkg/config/config_test_data.go | 9 ++-- pkg/config/replica_config.go | 37 +++++++++++++ pkg/filter/expr_filter.go | 22 ++++++-- pkg/filter/expr_filter_bench_test.go | 2 +- pkg/filter/expr_filter_test.go | 8 +-- pkg/filter/filter.go | 4 +- pkg/filter/sql_event_filter.go | 14 ++++- pkg/filter/sql_event_filter_test.go | 4 +- pkg/filter/utils_test.go | 60 +++++++++++++++++++++ pkg/orchestrator/reactor_state_test.go | 5 ++ scripts/check-diff-line-width.sh | 1 + 16 files changed, 257 insertions(+), 22 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3206f2f1899..0c546d6ab76 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -193,7 +193,8 @@ type ReplicaConfig struct { Consistent *ConsistentConfig `json:"consistent,omitempty"` Scheduler *ChangefeedSchedulerConfig `json:"scheduler"` Integrity *IntegrityConfig `json:"integrity"` - ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" swaggertype:"string"` + ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"` + SQLMode string `json:"sql_mode,omitempty"` } // ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig @@ -211,6 +212,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.ForceReplicate = c.ForceReplicate res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint +<<<<<<< HEAD +======= + res.IgnoreIneligibleTable = c.IgnoreIneligibleTable + res.SQLMode = c.SQLMode +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) if c.SyncPointInterval != nil { res.SyncPointInterval = c.SyncPointInterval.duration } @@ -444,6 +450,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { SyncPointInterval: &JSONDuration{cloned.SyncPointInterval}, SyncPointRetention: &JSONDuration{cloned.SyncPointRetention}, BDRMode: cloned.BDRMode, + SQLMode: cloned.SQLMode, } if cloned.Filter != nil { diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 2ed6dc97f84..d9517dafbf0 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -78,6 +78,7 @@ var defaultAPIConfig = &ReplicaConfig{ }, ChangefeedErrorStuckDuration: &JSONDuration{*config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration}, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, } func TestDefaultReplicaConfig(t *testing.T) { diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 0ed708b388a..c5aff9da0e3 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -336,6 +336,80 @@ func (info *ChangeFeedInfo) VerifyAndComplete() { if info.Config.ChangefeedErrorStuckDuration == nil { info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration } +<<<<<<< HEAD +======= + if info.Config.SQLMode == "" { + info.Config.SQLMode = defaultConfig.SQLMode + } + info.RmUnusedFields() +} + +// RmUnusedFields removes unnecessary fields based on the downstream type and +// the protocol. Since we utilize a common changefeed configuration template, +// certain fields may not be utilized for certain protocols. +func (info *ChangeFeedInfo) RmUnusedFields() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn( + "failed to parse the sink uri", + zap.Error(err), + zap.Any("sinkUri", info.SinkURI), + ) + return + } + if !sink.IsMQScheme(uri.Scheme) { + info.rmMQOnlyFields() + } else { + // remove schema registry for MQ downstream with + // protocol other than avro + if util.GetOrZero(info.Config.Sink.Protocol) != config.ProtocolAvro.String() { + info.Config.Sink.SchemaRegistry = nil + } + } + + if !sink.IsStorageScheme(uri.Scheme) { + info.rmStorageOnlyFields() + } + + if !sink.IsMySQLCompatibleScheme(uri.Scheme) { + info.rmDBOnlyFields() + } else { + // remove fields only being used by MQ and Storage downstream + info.Config.Sink.Protocol = nil + info.Config.Sink.Terminator = nil + } +} + +func (info *ChangeFeedInfo) rmMQOnlyFields() { + log.Info("since the downstream is not a MQ, remove MQ only fields", + zap.String("namespace", info.Namespace), + zap.String("changefeed", info.ID)) + info.Config.Sink.DispatchRules = nil + info.Config.Sink.SchemaRegistry = nil + info.Config.Sink.EncoderConcurrency = nil + info.Config.Sink.EnableKafkaSinkV2 = nil + info.Config.Sink.OnlyOutputUpdatedColumns = nil + info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil + info.Config.Sink.KafkaConfig = nil +} + +func (info *ChangeFeedInfo) rmStorageOnlyFields() { + info.Config.Sink.CSVConfig = nil + info.Config.Sink.DateSeparator = nil + info.Config.Sink.EnablePartitionSeparator = nil + info.Config.Sink.FileIndexWidth = nil + info.Config.Sink.CloudStorageConfig = nil +} + +func (info *ChangeFeedInfo) rmDBOnlyFields() { + info.Config.EnableSyncPoint = nil + info.Config.BDRMode = nil + info.Config.SyncPointInterval = nil + info.Config.SyncPointRetention = nil + info.Config.Consistent = nil + info.Config.Sink.SafeMode = nil + info.Config.Sink.MySQLConfig = nil +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } // FixIncompatible fixes incompatible changefeed meta info. diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index a02b1feaca1..58754706738 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -25,7 +25,11 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/format" +<<<<<<< HEAD "github.com/pingcap/tiflow/cdc/contextutil" +======= + "github.com/pingcap/tidb/parser/mysql" +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" @@ -429,7 +433,18 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { // addSpecialComment translate tidb feature to comment func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { - stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate) + p := parser.New() + // We need to use the correct SQL mode to parse the DDL query. + // Otherwise, the parser may fail to parse the DDL query. + // For example, it is needed to parse the following DDL query: + // `alter table "t" add column "c" int default 1;` + // by adding `ANSI_QUOTES` to the SQL mode. + mode, err := mysql.GetSQLMode(s.info.Config.SQLMode) + if err != nil { + return "", errors.Trace(err) + } + p.SetSQLMode(mode) + stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate) if err != nil { return "", errors.Trace(err) } diff --git a/cdc/owner/ddl_sink_test.go b/cdc/owner/ddl_sink_test.go index be3c3ee589d..52883f2e1c3 100644 --- a/cdc/owner/ddl_sink_test.go +++ b/cdc/owner/ddl_sink_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/stretchr/testify/require" @@ -61,7 +62,13 @@ func (m *mockSink) GetDDL() *model.DDLEvent { func newDDLSink4Test(reportErr func(err error), reportWarn func(err error)) (DDLSink, *mockSink) { mockSink := &mockSink{} - ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"), &model.ChangeFeedInfo{}, reportErr, reportWarn) + ddlSink := newDDLSink( + model.DefaultChangeFeedID("changefeed-test"), + &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + }, + reportErr, + reportWarn) ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx context.Context, s *ddlSinkImpl) error { s.sink = mockSink return nil @@ -545,6 +552,9 @@ func TestAddSpecialComment(t *testing.T) { } s := &ddlSinkImpl{} + s.info = &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + } for _, ca := range testCase { re, err := s.addSpecialComment(ca.event) require.Nil(t, err) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 5a44aac5aac..e418eee1439 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -71,7 +71,8 @@ const ( "integrity-check-level": "none", "corruption-handle-level": "warn" }, - "changefeed-error-stuck-duration": 1800000000000 + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" }` testCfgTestServerConfigMarshal = `{ @@ -287,7 +288,8 @@ const ( "integrity-check-level": "none", "corruption-handle-level": "warn" }, - "changefeed-error-stuck-duration": 1800000000000 + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" }` testCfgTestReplicaConfigMarshal2 = `{ @@ -418,6 +420,7 @@ const ( "integrity-check-level": "none", "corruption-handle-level": "warn" }, - "changefeed-error-stuck-duration": 1800000000000 + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" }` ) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index c08b1bc6879..6f81228ff52 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/pkg/config/outdated" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" @@ -37,6 +38,12 @@ const ( // minSyncPointRetention is the minimum of SyncPointRetention can be set. minSyncPointRetention = time.Hour * 1 minChangeFeedErrorStuckDuration = time.Minute * 30 + // The default SQL Mode of TiDB: "ONLY_FULL_GROUP_BY, + // STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO, + // NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" + // Note: The SQL Mode of TiDB is not the same as ORACLE. + // If you want to use the same SQL Mode as ORACLE, you need to add "ORACLE" to the SQL Mode. + defaultSQLMode = mysql.DefaultSQLMode ) var defaultReplicaConfig = &ReplicaConfig{ @@ -85,6 +92,7 @@ var defaultReplicaConfig = &ReplicaConfig{ CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn, }, ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30), + SQLMode: defaultSQLMode, } // GetDefaultReplicaConfig returns the default replica config. @@ -125,9 +133,38 @@ type replicaConfig struct { Sink *SinkConfig `toml:"sink" json:"sink"` Consistent *ConsistentConfig `toml:"consistent" json:"consistent"` // Scheduler is the configuration for scheduler. +<<<<<<< HEAD Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` Integrity *integrity.Config `toml:"integrity" json:"integrity"` ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` +======= + Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` + // Integrity is only available when the downstream is MQ. + Integrity *integrity.Config `toml:"integrity" json:"integrity"` + ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` + SQLMode string `toml:"sql-mode" json:"sql-mode"` +} + +// Value implements the driver.Valuer interface +func (c ReplicaConfig) Value() (driver.Value, error) { + cfg, err := c.Marshal() + if err != nil { + return nil, err + } + + // TODO: refactor the meaningless type conversion. + return []byte(cfg), nil +} + +// Scan implements the sql.Scanner interface +func (c *ReplicaConfig) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return c.UnmarshalJSON(b) +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } // Marshal returns the json marshal format of a ReplicationConfig diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index 8bae5d2e913..6015dda85ad 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -14,6 +14,7 @@ package filter import ( + "fmt" "strings" "sync" @@ -21,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" @@ -77,10 +79,16 @@ func newExprFilterRule( // verifyAndInitRule will verify and init the rule. // It should only be called in dmlExprFilter's verify method. -func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo) error { +func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo, sqlMode string) error { // verify expression filter rule syntax. p := parser.New() - _, _, err := p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) + mode, err := mysql.GetSQLMode(sqlMode) + if err != nil { + log.Error("failed to get sql mode", zap.Error(err)) + return cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode)) + } + p.SetSQLMode(mode) + _, _, err = p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) if err != nil { log.Error("failed to parse expression", zap.Error(err)) return cerror.ErrExpressionParseFailed. @@ -347,14 +355,18 @@ func getColumnFromError(err error) string { // dmlExprFilter is a filter that filters DML events by SQL expression. type dmlExprFilter struct { - rules []*dmlExprFilterRule + rules []*dmlExprFilterRule + sqlMODE string } func newExprFilter( timezone string, cfg *config.FilterConfig, + sqlMODE string, ) (*dmlExprFilter, error) { - res := &dmlExprFilter{} + res := &dmlExprFilter{ + sqlMODE: sqlMODE, + } sessCtx := utils.NewSessionCtx(map[string]string{ "time_zone": timezone, }) @@ -382,7 +394,7 @@ func (f *dmlExprFilter) addRule( // verify checks if all rules in this filter is valid. func (f *dmlExprFilter) verify(tableInfos []*model.TableInfo) error { for _, rule := range f.rules { - err := rule.verify(tableInfos) + err := rule.verify(tableInfos, f.sqlMODE) if err != nil { log.Error("failed to verify expression filter rule", zap.Error(err)) return errors.Trace(err) diff --git a/pkg/filter/expr_filter_bench_test.go b/pkg/filter/expr_filter_bench_test.go index 91c3fe903cc..62263e8e2f8 100644 --- a/pkg/filter/expr_filter_bench_test.go +++ b/pkg/filter/expr_filter_bench_test.go @@ -57,7 +57,7 @@ func BenchmarkSkipDML(b *testing.B) { sessCtx := utils.NewSessionCtx(map[string]string{ "time_zone": "", }) - f, err := newExprFilter("", cfg) + f, err := newExprFilter("", cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(b, err) type innerCase struct { diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index 372ace442eb..66fe1f02aa8 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -324,7 +324,7 @@ func TestShouldSkipDMLBasic(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) @@ -441,7 +441,7 @@ func TestShouldSkipDMLError(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) @@ -634,7 +634,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { if c.updateDDl != "" { @@ -754,7 +754,7 @@ func TestVerify(t *testing.T) { ti := helper.execDDL(ddl) tableInfos = append(tableInfos, ti) } - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) err = f.verify(tableInfos) require.True(t, errors.ErrorEqual(tc.err, err), "case: %+v", tc, err) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 071251fdc28..269d704bbff 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -100,11 +100,11 @@ func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) { f = tfilter.CaseInsensitive(f) } - dmlExprFilter, err := newExprFilter(tz, cfg.Filter) + dmlExprFilter, err := newExprFilter(tz, cfg.Filter, cfg.SQLMode) if err != nil { return nil, err } - sqlEventFilter, err := newSQLEventFilter(cfg.Filter) + sqlEventFilter, err := newSQLEventFilter(cfg.Filter, cfg.SQLMode) if err != nil { return nil, err } diff --git a/pkg/filter/sql_event_filter.go b/pkg/filter/sql_event_filter.go index 7e063c2c47f..bfc4c17d8db 100644 --- a/pkg/filter/sql_event_filter.go +++ b/pkg/filter/sql_event_filter.go @@ -14,6 +14,7 @@ package filter import ( + "fmt" "sync" "github.com/pingcap/errors" @@ -21,6 +22,7 @@ import ( bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" tfilter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -107,9 +109,17 @@ type sqlEventFilter struct { rules []*sqlEventRule } -func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) { +func newSQLEventFilter(cfg *config.FilterConfig, sqlMode string) (*sqlEventFilter, error) { + p := parser.New() + mode, err := mysql.GetSQLMode(sqlMode) + if err != nil { + log.Error("failed to get sql mode", zap.Error(err)) + return nil, cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode)) + } + p.SetSQLMode(mode) + res := &sqlEventFilter{ - ddlParser: parser.New(), + ddlParser: p, } for _, rule := range cfg.EventFilters { diff --git a/pkg/filter/sql_event_filter_test.go b/pkg/filter/sql_event_filter_test.go index 1fa03bedcc1..9e9707d0728 100644 --- a/pkg/filter/sql_event_filter_test.go +++ b/pkg/filter/sql_event_filter_test.go @@ -171,7 +171,7 @@ func TestShouldSkipDDL(t *testing.T) { } for _, tc := range testCases { - f, err := newSQLEventFilter(tc.cfg) + f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.True(t, errors.ErrorEqual(err, tc.err), "case: %+s", err) for _, c := range tc.cases { ddl := &model.DDLEvent{ @@ -298,7 +298,7 @@ func TestShouldSkipDML(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() - f, err := newSQLEventFilter(tc.cfg) + f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.NoError(t, err) for _, c := range tc.cases { event := &model.RowChangedEvent{ diff --git a/pkg/filter/utils_test.go b/pkg/filter/utils_test.go index bedc2e7fede..409f1e14d0a 100644 --- a/pkg/filter/utils_test.go +++ b/pkg/filter/utils_test.go @@ -14,11 +14,13 @@ package filter import ( + "fmt" "testing" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" tifilter "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -172,3 +174,61 @@ func TestDDLToTypeSpecialDDL(t *testing.T) { require.Equal(t, c.evenType, et, "case%v", c.ddl) } } + +func TestToDDLEventWithSQLMode(t *testing.T) { + t.Parallel() + cases := []struct { + name string + query string + jobTp timodel.ActionType + sqlMode string // sql mode + expect bf.EventType + errMsg string + }{ + { + name: "create table", + query: "create table t1(id int primary key)", + jobTp: timodel.ActionCreateTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.CreateTable, + }, + { + name: "drop table", + query: "drop table t1", + jobTp: timodel.ActionDropTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.DropTable, + }, + { // "" in table name or column name are not supported when sqlMode is set to ANSI_QUOTES + name: "create table 2", + query: `create table "t1" ("id" int primary key)`, + jobTp: timodel.ActionCreateTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.CreateTable, + errMsg: "ErrConvertDDLToEventTypeFailed", + }, + { // "" in table name or column name are supported when sqlMode is set to ANSI_QUOTES + name: "create table 3", + query: `create table "t1" ("id" int primary key)`, + jobTp: timodel.ActionCreateTable, + sqlMode: fmt.Sprint(config.GetDefaultReplicaConfig().SQLMode + ",ANSI_QUOTES"), + expect: bf.CreateTable, + }, + } + for _, c := range cases { + innerCase := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + p := parser.New() + mode, err := mysql.GetSQLMode(innerCase.sqlMode) + require.NoError(t, err) + p.SetSQLMode(mode) + tp, err := ddlToEventType(p, innerCase.query, innerCase.jobTp) + if innerCase.errMsg != "" { + require.Contains(t, err.Error(), innerCase.errMsg, innerCase.name) + } else { + require.Equal(t, innerCase.expect, tp) + } + }) + } +} diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index ed403faf1b9..64998c6d194 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -130,6 +130,7 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -185,6 +186,7 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -245,6 +247,7 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -336,6 +339,7 @@ func TestPatchInfo(t *testing.T) { Scheduler: defaultConfig.Scheduler, Integrity: defaultConfig.Integrity, ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, + SQLMode: defaultConfig.SQLMode, }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -355,6 +359,7 @@ func TestPatchInfo(t *testing.T) { Scheduler: defaultConfig.Scheduler, Integrity: defaultConfig.Integrity, ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, + SQLMode: defaultConfig.SQLMode, }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { diff --git a/scripts/check-diff-line-width.sh b/scripts/check-diff-line-width.sh index af65760cc44..a034d0a2bf6 100755 --- a/scripts/check-diff-line-width.sh +++ b/scripts/check-diff-line-width.sh @@ -27,6 +27,7 @@ git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \ -- ':(exclude)*_gen.go' \ -- ':(exclude)*_gen_test.go' \ -- ':(exclude)*_mock.go' \ + -- ':(exclude)*_test_data.go' \ -- ':(exclude)*.pb.go' | grep -E '^\+' | grep -vE '^\+\+\+' | grep -vE 'json:' | grep -vE 'toml:' | sed 's/\t/ /g' |