diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 33a2b885c2d..fd09bc95e4d 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -185,10 +185,21 @@ type ReplicaConfig struct { SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"` SyncPointRetention *JSONDuration `json:"sync_point_retention" swaggertype:"string"` +<<<<<<< HEAD Filter *FilterConfig `json:"filter"` Mounter *MounterConfig `json:"mounter"` Sink *SinkConfig `json:"sink"` Consistent *ConsistentConfig `json:"consistent"` +======= + Filter *FilterConfig `json:"filter"` + Mounter *MounterConfig `json:"mounter"` + Sink *SinkConfig `json:"sink"` + Consistent *ConsistentConfig `json:"consistent,omitempty"` + Scheduler *ChangefeedSchedulerConfig `json:"scheduler"` + Integrity *IntegrityConfig `json:"integrity"` + ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"` + SQLMode string `json:"sql_mode,omitempty"` +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } // ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig @@ -206,6 +217,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.ForceReplicate = c.ForceReplicate res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint +<<<<<<< HEAD +======= + res.IgnoreIneligibleTable = c.IgnoreIneligibleTable + res.SQLMode = c.SQLMode +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) if c.SyncPointInterval != nil { res.SyncPointInterval = c.SyncPointInterval.duration } @@ -348,6 +364,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { SyncPointInterval: &JSONDuration{cloned.SyncPointInterval}, SyncPointRetention: &JSONDuration{cloned.SyncPointRetention}, BDRMode: cloned.BDRMode, + SQLMode: cloned.SQLMode, } if cloned.Filter != nil { diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 9382356a670..0a2cff2396d 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -62,6 +62,24 @@ var defaultAPIConfig = &ReplicaConfig{ Storage: "", UseFileBackend: false, }, +<<<<<<< HEAD +======= + Scheduler: &ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: config.GetDefaultReplicaConfig(). + Scheduler.EnableTableAcrossNodes, + RegionThreshold: config.GetDefaultReplicaConfig(). + Scheduler.RegionThreshold, + WriteKeyThreshold: config.GetDefaultReplicaConfig(). + Scheduler.WriteKeyThreshold, + }, + Integrity: &IntegrityConfig{ + IntegrityCheckLevel: config.GetDefaultReplicaConfig().Integrity.IntegrityCheckLevel, + CorruptionHandleLevel: config.GetDefaultReplicaConfig().Integrity.CorruptionHandleLevel, + }, + ChangefeedErrorStuckDuration: &JSONDuration{*config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration}, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } func TestDefaultReplicaConfig(t *testing.T) { diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 3493e3c1e6f..dc7263f575f 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -323,6 +323,90 @@ func (info *ChangeFeedInfo) VerifyAndComplete() { if info.Config.Consistent == nil { info.Config.Consistent = defaultConfig.Consistent } +<<<<<<< HEAD +======= + if info.Config.Scheduler == nil { + info.Config.Scheduler = defaultConfig.Scheduler + } + + if info.Config.Integrity == nil { + info.Config.Integrity = defaultConfig.Integrity + } + if info.Config.ChangefeedErrorStuckDuration == nil { + info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration + } + if info.Config.SQLMode == "" { + info.Config.SQLMode = defaultConfig.SQLMode + } + info.RmUnusedFields() +} + +// RmUnusedFields removes unnecessary fields based on the downstream type and +// the protocol. Since we utilize a common changefeed configuration template, +// certain fields may not be utilized for certain protocols. +func (info *ChangeFeedInfo) RmUnusedFields() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn( + "failed to parse the sink uri", + zap.Error(err), + zap.Any("sinkUri", info.SinkURI), + ) + return + } + if !sink.IsMQScheme(uri.Scheme) { + info.rmMQOnlyFields() + } else { + // remove schema registry for MQ downstream with + // protocol other than avro + if util.GetOrZero(info.Config.Sink.Protocol) != config.ProtocolAvro.String() { + info.Config.Sink.SchemaRegistry = nil + } + } + + if !sink.IsStorageScheme(uri.Scheme) { + info.rmStorageOnlyFields() + } + + if !sink.IsMySQLCompatibleScheme(uri.Scheme) { + info.rmDBOnlyFields() + } else { + // remove fields only being used by MQ and Storage downstream + info.Config.Sink.Protocol = nil + info.Config.Sink.Terminator = nil + } +} + +func (info *ChangeFeedInfo) rmMQOnlyFields() { + log.Info("since the downstream is not a MQ, remove MQ only fields", + zap.String("namespace", info.Namespace), + zap.String("changefeed", info.ID)) + info.Config.Sink.DispatchRules = nil + info.Config.Sink.SchemaRegistry = nil + info.Config.Sink.EncoderConcurrency = nil + info.Config.Sink.EnableKafkaSinkV2 = nil + info.Config.Sink.OnlyOutputUpdatedColumns = nil + info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil + info.Config.Sink.KafkaConfig = nil +} + +func (info *ChangeFeedInfo) rmStorageOnlyFields() { + info.Config.Sink.CSVConfig = nil + info.Config.Sink.DateSeparator = nil + info.Config.Sink.EnablePartitionSeparator = nil + info.Config.Sink.FileIndexWidth = nil + info.Config.Sink.CloudStorageConfig = nil +} + +func (info *ChangeFeedInfo) rmDBOnlyFields() { + info.Config.EnableSyncPoint = nil + info.Config.BDRMode = nil + info.Config.SyncPointInterval = nil + info.Config.SyncPointRetention = nil + info.Config.Consistent = nil + info.Config.Sink.SafeMode = nil + info.Config.Sink.MySQLConfig = nil +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } // FixIncompatible fixes incompatible changefeed meta info. diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index c1c925fa4c9..4a07bb6c356 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -25,7 +25,11 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/format" +<<<<<<< HEAD "github.com/pingcap/tiflow/cdc/contextutil" +======= + "github.com/pingcap/tidb/parser/mysql" +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) "github.com/pingcap/tiflow/cdc/model" sinkv1 "github.com/pingcap/tiflow/cdc/sink" "github.com/pingcap/tiflow/cdc/sink/mysql" @@ -454,7 +458,18 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { // addSpecialComment translate tidb feature to comment func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { - stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate) + p := parser.New() + // We need to use the correct SQL mode to parse the DDL query. + // Otherwise, the parser may fail to parse the DDL query. + // For example, it is needed to parse the following DDL query: + // `alter table "t" add column "c" int default 1;` + // by adding `ANSI_QUOTES` to the SQL mode. + mode, err := mysql.GetSQLMode(s.info.Config.SQLMode) + if err != nil { + return "", errors.Trace(err) + } + p.SetSQLMode(mode) + stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate) if err != nil { return "", errors.Trace(err) } diff --git a/cdc/owner/ddl_sink_test.go b/cdc/owner/ddl_sink_test.go index 7cd4e6bfda1..663caff97a2 100644 --- a/cdc/owner/ddl_sink_test.go +++ b/cdc/owner/ddl_sink_test.go @@ -22,7 +22,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" +<<<<<<< HEAD "github.com/pingcap/tiflow/cdc/sink" +======= + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + "github.com/pingcap/tiflow/pkg/config" +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/stretchr/testify/require" @@ -65,7 +70,13 @@ func (m *mockSink) GetDDL() *model.DDLEvent { func newDDLSink4Test(reportErr func(err error), reportWarn func(err error)) (DDLSink, *mockSink) { mockSink := &mockSink{} - ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"), &model.ChangeFeedInfo{}, reportErr, reportWarn) + ddlSink := newDDLSink( + model.DefaultChangeFeedID("changefeed-test"), + &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + }, + reportErr, + reportWarn) ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx context.Context, s *ddlSinkImpl) error { s.sinkV1 = mockSink return nil @@ -487,6 +498,9 @@ func TestAddSpecialComment(t *testing.T) { } s := &ddlSinkImpl{} + s.info = &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + } for _, ca := range testCase { re, err := s.addSpecialComment(ca.event) require.Nil(t, err) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 57a5b6bb01e..59a592b66c8 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -62,7 +62,21 @@ const ( "flush-interval": 2000, "storage": "", "use-file-backend": false +<<<<<<< HEAD } +======= + }, + "scheduler": { + "enable-table-across-nodes": false, + "region-threshold": 100000 + }, + "integrity": { + "integrity-check-level": "none", + "corruption-handle-level": "warn" + }, + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }` testCfgTestServerConfigMarshal = `{ @@ -213,7 +227,24 @@ const ( "flush-interval": 2000, "storage": "", "use-file-backend": false +<<<<<<< HEAD } +======= + }, + "scheduler": { + "enable-table-across-nodes": true, + "region-per-span": 0, + "region-threshold": 100001, + "write-key-threshold": 100001, + "region-per-span": 0 + }, + "integrity": { + "integrity-check-level": "none", + "corruption-handle-level": "warn" + }, + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }` testCfgTestReplicaConfigMarshal2 = `{ @@ -273,6 +304,21 @@ const ( "flush-interval": 2000, "storage": "", "use-file-backend": false +<<<<<<< HEAD } +======= + }, + "scheduler": { + "enable-table-across-nodes": true, + "region-threshold": 100001, + "write-key-threshold": 100001 + }, + "integrity": { + "integrity-check-level": "none", + "corruption-handle-level": "warn" + }, + "changefeed-error-stuck-duration": 1800000000000, + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }` ) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index a602056300b..31e7872b09b 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/pkg/config/outdated" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" @@ -33,7 +34,18 @@ const ( // minSyncPointInterval is the minimum of SyncPointInterval can be set. minSyncPointInterval = time.Second * 30 // minSyncPointRetention is the minimum of SyncPointRetention can be set. +<<<<<<< HEAD minSyncPointRetention = time.Hour * 1 +======= + minSyncPointRetention = time.Hour * 1 + minChangeFeedErrorStuckDuration = time.Minute * 30 + // The default SQL Mode of TiDB: "ONLY_FULL_GROUP_BY, + // STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO, + // NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" + // Note: The SQL Mode of TiDB is not the same as ORACLE. + // If you want to use the same SQL Mode as ORACLE, you need to add "ORACLE" to the SQL Mode. + defaultSQLMode = mysql.DefaultSQLMode +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) ) var defaultReplicaConfig = &ReplicaConfig{ @@ -71,6 +83,20 @@ var defaultReplicaConfig = &ReplicaConfig{ Storage: "", UseFileBackend: false, }, +<<<<<<< HEAD +======= + Scheduler: &ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: false, + RegionThreshold: 100_000, + WriteKeyThreshold: 0, + }, + Integrity: &integrity.Config{ + IntegrityCheckLevel: integrity.CheckLevelNone, + CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn, + }, + ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30), + SQLMode: defaultSQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } // GetDefaultReplicaConfig returns the default replica config. @@ -103,6 +129,7 @@ type replicaConfig struct { // BDR(Bidirectional Replication) is a feature that allows users to // replicate data of same tables from TiDB-1 to TiDB-2 and vice versa. // This feature is only available for TiDB. +<<<<<<< HEAD BDRMode bool `toml:"bdr-mode" json:"bdr-mode"` SyncPointInterval time.Duration `toml:"sync-point-interval" json:"sync-point-interval"` SyncPointRetention time.Duration `toml:"sync-point-retention" json:"sync-point-retention"` @@ -110,6 +137,45 @@ type replicaConfig struct { Mounter *MounterConfig `toml:"mounter" json:"mounter"` Sink *SinkConfig `toml:"sink" json:"sink"` Consistent *ConsistentConfig `toml:"consistent" json:"consistent"` +======= + BDRMode *bool `toml:"bdr-mode" json:"bdr-mode,omitempty"` + // SyncPointInterval is only available when the downstream is DB. + SyncPointInterval *time.Duration `toml:"sync-point-interval" json:"sync-point-interval,omitempty"` + // SyncPointRetention is only available when the downstream is DB. + SyncPointRetention *time.Duration `toml:"sync-point-retention" json:"sync-point-retention,omitempty"` + Filter *FilterConfig `toml:"filter" json:"filter"` + Mounter *MounterConfig `toml:"mounter" json:"mounter"` + Sink *SinkConfig `toml:"sink" json:"sink"` + // Consistent is only available for DB downstream with redo feature enabled. + Consistent *ConsistentConfig `toml:"consistent" json:"consistent,omitempty"` + // Scheduler is the configuration for scheduler. + Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` + // Integrity is only available when the downstream is MQ. + Integrity *integrity.Config `toml:"integrity" json:"integrity"` + ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` + SQLMode string `toml:"sql-mode" json:"sql-mode"` +} + +// Value implements the driver.Valuer interface +func (c ReplicaConfig) Value() (driver.Value, error) { + cfg, err := c.Marshal() + if err != nil { + return nil, err + } + + // TODO: refactor the meaningless type conversion. + return []byte(cfg), nil +} + +// Scan implements the sql.Scanner interface +func (c *ReplicaConfig) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return c.UnmarshalJSON(b) +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) } // Marshal returns the json marshal format of a ReplicationConfig diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index 8bae5d2e913..6015dda85ad 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -14,6 +14,7 @@ package filter import ( + "fmt" "strings" "sync" @@ -21,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" @@ -77,10 +79,16 @@ func newExprFilterRule( // verifyAndInitRule will verify and init the rule. // It should only be called in dmlExprFilter's verify method. -func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo) error { +func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo, sqlMode string) error { // verify expression filter rule syntax. p := parser.New() - _, _, err := p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) + mode, err := mysql.GetSQLMode(sqlMode) + if err != nil { + log.Error("failed to get sql mode", zap.Error(err)) + return cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode)) + } + p.SetSQLMode(mode) + _, _, err = p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) if err != nil { log.Error("failed to parse expression", zap.Error(err)) return cerror.ErrExpressionParseFailed. @@ -347,14 +355,18 @@ func getColumnFromError(err error) string { // dmlExprFilter is a filter that filters DML events by SQL expression. type dmlExprFilter struct { - rules []*dmlExprFilterRule + rules []*dmlExprFilterRule + sqlMODE string } func newExprFilter( timezone string, cfg *config.FilterConfig, + sqlMODE string, ) (*dmlExprFilter, error) { - res := &dmlExprFilter{} + res := &dmlExprFilter{ + sqlMODE: sqlMODE, + } sessCtx := utils.NewSessionCtx(map[string]string{ "time_zone": timezone, }) @@ -382,7 +394,7 @@ func (f *dmlExprFilter) addRule( // verify checks if all rules in this filter is valid. func (f *dmlExprFilter) verify(tableInfos []*model.TableInfo) error { for _, rule := range f.rules { - err := rule.verify(tableInfos) + err := rule.verify(tableInfos, f.sqlMODE) if err != nil { log.Error("failed to verify expression filter rule", zap.Error(err)) return errors.Trace(err) diff --git a/pkg/filter/expr_filter_bench_test.go b/pkg/filter/expr_filter_bench_test.go index 91c3fe903cc..62263e8e2f8 100644 --- a/pkg/filter/expr_filter_bench_test.go +++ b/pkg/filter/expr_filter_bench_test.go @@ -57,7 +57,7 @@ func BenchmarkSkipDML(b *testing.B) { sessCtx := utils.NewSessionCtx(map[string]string{ "time_zone": "", }) - f, err := newExprFilter("", cfg) + f, err := newExprFilter("", cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(b, err) type innerCase struct { diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index 372ace442eb..66fe1f02aa8 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -324,7 +324,7 @@ func TestShouldSkipDMLBasic(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) @@ -441,7 +441,7 @@ func TestShouldSkipDMLError(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) @@ -634,7 +634,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) for _, c := range tc.cases { if c.updateDDl != "" { @@ -754,7 +754,7 @@ func TestVerify(t *testing.T) { ti := helper.execDDL(ddl) tableInfos = append(tableInfos, ti) } - f, err := newExprFilter("", tc.cfg) + f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.Nil(t, err) err = f.verify(tableInfos) require.True(t, errors.ErrorEqual(tc.err, err), "case: %+v", tc, err) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 3b321f16322..4afa4bb0516 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -97,11 +97,11 @@ func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) { f = tfilter.CaseInsensitive(f) } - dmlExprFilter, err := newExprFilter(tz, cfg.Filter) + dmlExprFilter, err := newExprFilter(tz, cfg.Filter, cfg.SQLMode) if err != nil { return nil, err } - sqlEventFilter, err := newSQLEventFilter(cfg.Filter) + sqlEventFilter, err := newSQLEventFilter(cfg.Filter, cfg.SQLMode) if err != nil { return nil, err } diff --git a/pkg/filter/sql_event_filter.go b/pkg/filter/sql_event_filter.go index d9e80d6233a..fdf3ab885bf 100644 --- a/pkg/filter/sql_event_filter.go +++ b/pkg/filter/sql_event_filter.go @@ -14,6 +14,7 @@ package filter import ( + "fmt" "sync" "github.com/pingcap/errors" @@ -21,6 +22,7 @@ import ( bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" tfilter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -107,9 +109,17 @@ type sqlEventFilter struct { rules []*sqlEventRule } -func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) { +func newSQLEventFilter(cfg *config.FilterConfig, sqlMode string) (*sqlEventFilter, error) { + p := parser.New() + mode, err := mysql.GetSQLMode(sqlMode) + if err != nil { + log.Error("failed to get sql mode", zap.Error(err)) + return nil, cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode)) + } + p.SetSQLMode(mode) + res := &sqlEventFilter{ - ddlParser: parser.New(), + ddlParser: p, } for _, rule := range cfg.EventFilters { if err := res.addRule(rule); err != nil { diff --git a/pkg/filter/sql_event_filter_test.go b/pkg/filter/sql_event_filter_test.go index a547d1a081c..935e242e40a 100644 --- a/pkg/filter/sql_event_filter_test.go +++ b/pkg/filter/sql_event_filter_test.go @@ -171,7 +171,7 @@ func TestShouldSkipDDL(t *testing.T) { } for _, tc := range testCases { - f, err := newSQLEventFilter(tc.cfg) + f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode) require.True(t, errors.ErrorEqual(err, tc.err), "case: %+s", err) for _, c := range tc.cases { ddl := &model.DDLEvent{ @@ -298,7 +298,11 @@ func TestShouldSkipDML(t *testing.T) { tCase := tc t.Run(tCase.name, func(t *testing.T) { t.Parallel() +<<<<<<< HEAD f, err := newSQLEventFilter(tCase.cfg) +======= + f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode) +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) require.NoError(t, err) for _, c := range tCase.cases { event := &model.RowChangedEvent{ diff --git a/pkg/filter/utils_test.go b/pkg/filter/utils_test.go index 119a848c2c5..bf9ef4050df 100644 --- a/pkg/filter/utils_test.go +++ b/pkg/filter/utils_test.go @@ -14,12 +14,14 @@ package filter import ( + "fmt" "testing" "github.com/pingcap/log" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" tifilter "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -173,3 +175,61 @@ func TestDDLToTypeSpecialDDL(t *testing.T) { require.Equal(t, c.evenType, et, "case%v", c.ddl) } } + +func TestToDDLEventWithSQLMode(t *testing.T) { + t.Parallel() + cases := []struct { + name string + query string + jobTp timodel.ActionType + sqlMode string // sql mode + expect bf.EventType + errMsg string + }{ + { + name: "create table", + query: "create table t1(id int primary key)", + jobTp: timodel.ActionCreateTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.CreateTable, + }, + { + name: "drop table", + query: "drop table t1", + jobTp: timodel.ActionDropTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.DropTable, + }, + { // "" in table name or column name are not supported when sqlMode is set to ANSI_QUOTES + name: "create table 2", + query: `create table "t1" ("id" int primary key)`, + jobTp: timodel.ActionCreateTable, + sqlMode: config.GetDefaultReplicaConfig().SQLMode, + expect: bf.CreateTable, + errMsg: "ErrConvertDDLToEventTypeFailed", + }, + { // "" in table name or column name are supported when sqlMode is set to ANSI_QUOTES + name: "create table 3", + query: `create table "t1" ("id" int primary key)`, + jobTp: timodel.ActionCreateTable, + sqlMode: fmt.Sprint(config.GetDefaultReplicaConfig().SQLMode + ",ANSI_QUOTES"), + expect: bf.CreateTable, + }, + } + for _, c := range cases { + innerCase := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + p := parser.New() + mode, err := mysql.GetSQLMode(innerCase.sqlMode) + require.NoError(t, err) + p.SetSQLMode(mode) + tp, err := ddlToEventType(p, innerCase.query, innerCase.jobTp) + if innerCase.errMsg != "" { + require.Contains(t, err.Error(), innerCase.errMsg, innerCase.name) + } else { + require.Equal(t, innerCase.expect, tp) + } + }) + } +} diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index af79b999426..54dc537d801 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -128,7 +128,14 @@ func TestChangefeedStateUpdate(t *testing.T) { Protocol: "open-protocol", AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec, }, +<<<<<<< HEAD Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, +======= + Integrity: config.GetDefaultReplicaConfig().Integrity, + ChangefeedErrorStuckDuration: config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -179,7 +186,15 @@ func TestChangefeedStateUpdate(t *testing.T) { Protocol: "open-protocol", AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec, }, +<<<<<<< HEAD Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, +======= + Scheduler: config.GetDefaultReplicaConfig().Scheduler, + Integrity: config.GetDefaultReplicaConfig().Integrity, + ChangefeedErrorStuckDuration: config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -235,7 +250,15 @@ func TestChangefeedStateUpdate(t *testing.T) { Protocol: "open-protocol", AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec, }, +<<<<<<< HEAD Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, +======= + Scheduler: config.GetDefaultReplicaConfig().Scheduler, + Integrity: config.GetDefaultReplicaConfig().Integrity, + ChangefeedErrorStuckDuration: config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -320,10 +343,21 @@ func TestPatchInfo(t *testing.T) { SinkURI: "123", Engine: model.SortUnified, Config: &config.ReplicaConfig{ +<<<<<<< HEAD Filter: defaultConfig.Filter, Mounter: defaultConfig.Mounter, Sink: defaultConfig.Sink, Consistent: defaultConfig.Consistent, +======= + Filter: defaultConfig.Filter, + Mounter: defaultConfig.Mounter, + Sink: defaultConfig.Sink, + Consistent: defaultConfig.Consistent, + Scheduler: defaultConfig.Scheduler, + Integrity: defaultConfig.Integrity, + ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, + SQLMode: defaultConfig.SQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -336,10 +370,21 @@ func TestPatchInfo(t *testing.T) { StartTs: 6, Engine: model.SortUnified, Config: &config.ReplicaConfig{ +<<<<<<< HEAD Filter: defaultConfig.Filter, Mounter: defaultConfig.Mounter, Sink: defaultConfig.Sink, Consistent: defaultConfig.Consistent, +======= + Filter: defaultConfig.Filter, + Mounter: defaultConfig.Mounter, + Sink: defaultConfig.Sink, + Consistent: defaultConfig.Consistent, + Scheduler: defaultConfig.Scheduler, + Integrity: defaultConfig.Integrity, + ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, + SQLMode: defaultConfig.SQLMode, +>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941)) }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { diff --git a/scripts/check-diff-line-width.sh b/scripts/check-diff-line-width.sh index 7b0ff7c9f13..beae6085b68 100755 --- a/scripts/check-diff-line-width.sh +++ b/scripts/check-diff-line-width.sh @@ -27,6 +27,7 @@ git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \ -- ':(exclude)*_gen.go' \ -- ':(exclude)*_gen_test.go' \ -- ':(exclude)*_mock.go' \ + -- ':(exclude)*_test_data.go' \ -- ':(exclude)*.pb.go' | grep -E '^\+' | grep -vE '^\+\+\+' | grep -E '^//' | sed 's/\t/ /g' |