Skip to content

Commit

Permalink
config, changefeed (ticdc): add sql mode config for changefeed (#9941)
Browse files Browse the repository at this point in the history
close #9876
  • Loading branch information
asddongmen authored Nov 2, 2023
1 parent 684d117 commit 365113c
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 22 deletions.
5 changes: 4 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ type ReplicaConfig struct {
Consistent *ConsistentConfig `json:"consistent,omitempty"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" swaggertype:"string"`
ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"`
SQLMode string `json:"sql_mode,omitempty"`
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand All @@ -211,6 +212,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
res.IgnoreIneligibleTable = c.IgnoreIneligibleTable
res.SQLMode = c.SQLMode
if c.SyncPointInterval != nil {
res.SyncPointInterval = &c.SyncPointInterval.duration
}
Expand Down Expand Up @@ -498,6 +500,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
BDRMode: cloned.BDRMode,
SQLMode: cloned.SQLMode,
}

if cloned.SyncPointInterval != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var defaultAPIConfig = &ReplicaConfig{
},
ChangefeedErrorStuckDuration: &JSONDuration{*config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration},
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
}

func TestDefaultReplicaConfig(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.ChangefeedErrorStuckDuration == nil {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
if info.Config.SQLMode == "" {
info.Config.SQLMode = defaultConfig.SQLMode
}
info.RmUnusedFields()
}

Expand Down
14 changes: 13 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/factory"
Expand Down Expand Up @@ -426,7 +427,18 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) {

// addSpecialComment translate tidb feature to comment
func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate)
p := parser.New()
// We need to use the correct SQL mode to parse the DDL query.
// Otherwise, the parser may fail to parse the DDL query.
// For example, it is needed to parse the following DDL query:
// `alter table "t" add column "c" int default 1;`
// by adding `ANSI_QUOTES` to the SQL mode.
mode, err := mysql.GetSQLMode(s.info.Config.SQLMode)
if err != nil {
return "", errors.Trace(err)
}
p.SetSQLMode(mode)
stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate)
if err != nil {
return "", errors.Trace(err)
}
Expand Down
12 changes: 11 additions & 1 deletion cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -61,7 +62,13 @@ func (m *mockSink) GetDDL() *model.DDLEvent {

func newDDLSink4Test(reportErr func(err error), reportWarn func(err error)) (DDLSink, *mockSink) {
mockSink := &mockSink{}
ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"), &model.ChangeFeedInfo{}, reportErr, reportWarn)
ddlSink := newDDLSink(
model.DefaultChangeFeedID("changefeed-test"),
&model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
},
reportErr,
reportWarn)
ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx context.Context, s *ddlSinkImpl) error {
s.sink = mockSink
return nil
Expand Down Expand Up @@ -545,6 +552,9 @@ func TestAddSpecialComment(t *testing.T) {
}

s := &ddlSinkImpl{}
s.info = &model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}
for _, ca := range testCase {
re, err := s.addSpecialComment(ca.event)
require.Nil(t, err)
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ const (
"integrity-check-level": "none",
"corruption-handle-level": "warn"
},
"changefeed-error-stuck-duration": 1800000000000
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
}`

testCfgTestServerConfigMarshal = `{
Expand Down Expand Up @@ -319,7 +320,8 @@ const (
"integrity-check-level": "none",
"corruption-handle-level": "warn"
},
"changefeed-error-stuck-duration": 1800000000000
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
}`

testCfgTestReplicaConfigMarshal2 = `{
Expand Down Expand Up @@ -465,6 +467,7 @@ const (
"integrity-check-level": "none",
"corruption-handle-level": "warn"
},
"changefeed-error-stuck-duration": 1800000000000
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
}`
)
9 changes: 9 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/pkg/config/outdated"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
Expand All @@ -38,6 +39,12 @@ const (
// minSyncPointRetention is the minimum of SyncPointRetention can be set.
minSyncPointRetention = time.Hour * 1
minChangeFeedErrorStuckDuration = time.Minute * 30
// The default SQL Mode of TiDB: "ONLY_FULL_GROUP_BY,
// STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,
// NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
// Note: The SQL Mode of TiDB is not the same as ORACLE.
// If you want to use the same SQL Mode as ORACLE, you need to add "ORACLE" to the SQL Mode.
defaultSQLMode = mysql.DefaultSQLMode
)

var defaultReplicaConfig = &ReplicaConfig{
Expand Down Expand Up @@ -89,6 +96,7 @@ var defaultReplicaConfig = &ReplicaConfig{
CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn,
},
ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30),
SQLMode: defaultSQLMode,
}

// GetDefaultReplicaConfig returns the default replica config.
Expand Down Expand Up @@ -140,6 +148,7 @@ type replicaConfig struct {
// Integrity is only available when the downstream is MQ.
Integrity *integrity.Config `toml:"integrity" json:"integrity"`
ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"`
SQLMode string `toml:"sql-mode" json:"sql-mode"`
}

// Value implements the driver.Valuer interface
Expand Down
22 changes: 17 additions & 5 deletions pkg/filter/expr_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package filter

import (
"fmt"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -77,10 +79,16 @@ func newExprFilterRule(

// verifyAndInitRule will verify and init the rule.
// It should only be called in dmlExprFilter's verify method.
func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo) error {
func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo, sqlMode string) error {
// verify expression filter rule syntax.
p := parser.New()
_, _, err := p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr))
mode, err := mysql.GetSQLMode(sqlMode)
if err != nil {
log.Error("failed to get sql mode", zap.Error(err))
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode))
}
p.SetSQLMode(mode)
_, _, err = p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr))
if err != nil {
log.Error("failed to parse expression", zap.Error(err))
return cerror.ErrExpressionParseFailed.
Expand Down Expand Up @@ -347,14 +355,18 @@ func getColumnFromError(err error) string {

// dmlExprFilter is a filter that filters DML events by SQL expression.
type dmlExprFilter struct {
rules []*dmlExprFilterRule
rules []*dmlExprFilterRule
sqlMODE string
}

func newExprFilter(
timezone string,
cfg *config.FilterConfig,
sqlMODE string,
) (*dmlExprFilter, error) {
res := &dmlExprFilter{}
res := &dmlExprFilter{
sqlMODE: sqlMODE,
}
sessCtx := utils.NewSessionCtx(map[string]string{
"time_zone": timezone,
})
Expand Down Expand Up @@ -382,7 +394,7 @@ func (f *dmlExprFilter) addRule(
// verify checks if all rules in this filter is valid.
func (f *dmlExprFilter) verify(tableInfos []*model.TableInfo) error {
for _, rule := range f.rules {
err := rule.verify(tableInfos)
err := rule.verify(tableInfos, f.sqlMODE)
if err != nil {
log.Error("failed to verify expression filter rule", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/expr_filter_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func BenchmarkSkipDML(b *testing.B) {
sessCtx := utils.NewSessionCtx(map[string]string{
"time_zone": "",
})
f, err := newExprFilter("", cfg)
f, err := newExprFilter("", cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(b, err)

type innerCase struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/filter/expr_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestShouldSkipDMLBasic(t *testing.T) {

for _, tc := range testCases {
tableInfo := helper.execDDL(tc.ddl)
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
for _, c := range tc.cases {
rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns)
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestShouldSkipDMLError(t *testing.T) {

for _, tc := range testCases {
tableInfo := helper.execDDL(tc.ddl)
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
for _, c := range tc.cases {
rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns)
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) {

for _, tc := range testCases {
tableInfo := helper.execDDL(tc.ddl)
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
for _, c := range tc.cases {
if c.updateDDl != "" {
Expand Down Expand Up @@ -754,7 +754,7 @@ func TestVerify(t *testing.T) {
ti := helper.execDDL(ddl)
tableInfos = append(tableInfos, ti)
}
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
err = f.verify(tableInfos)
require.True(t, errors.ErrorEqual(tc.err, err), "case: %+v", tc, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) {
f = tfilter.CaseInsensitive(f)
}

dmlExprFilter, err := newExprFilter(tz, cfg.Filter)
dmlExprFilter, err := newExprFilter(tz, cfg.Filter, cfg.SQLMode)
if err != nil {
return nil, err
}
sqlEventFilter, err := newSQLEventFilter(cfg.Filter)
sqlEventFilter, err := newSQLEventFilter(cfg.Filter, cfg.SQLMode)
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package filter

import (
"fmt"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb/parser"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
tfilter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -107,9 +109,17 @@ type sqlEventFilter struct {
rules []*sqlEventRule
}

func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) {
func newSQLEventFilter(cfg *config.FilterConfig, sqlMode string) (*sqlEventFilter, error) {
p := parser.New()
mode, err := mysql.GetSQLMode(sqlMode)
if err != nil {
log.Error("failed to get sql mode", zap.Error(err))
return nil, cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode))
}
p.SetSQLMode(mode)

res := &sqlEventFilter{
ddlParser: parser.New(),
ddlParser: p,
}
for _, rule := range cfg.EventFilters {
if err := res.addRule(rule); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/sql_event_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestShouldSkipDDL(t *testing.T) {
}

for _, tc := range testCases {
f, err := newSQLEventFilter(tc.cfg)
f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.True(t, errors.ErrorEqual(err, tc.err), "case: %+s", err)
for _, c := range tc.cases {
ddl := &model.DDLEvent{
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestShouldSkipDML(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, err := newSQLEventFilter(tc.cfg)
f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.NoError(t, err)
for _, c := range tc.cases {
event := &model.RowChangedEvent{
Expand Down
Loading

0 comments on commit 365113c

Please sign in to comment.