Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9941
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Nov 2, 2023
1 parent 4febedd commit ae4ee1e
Show file tree
Hide file tree
Showing 16 changed files with 409 additions and 17 deletions.
17 changes: 17 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,21 @@ type ReplicaConfig struct {
SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"`
SyncPointRetention *JSONDuration `json:"sync_point_retention" swaggertype:"string"`

<<<<<<< HEAD
Filter *FilterConfig `json:"filter"`
Mounter *MounterConfig `json:"mounter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
=======
Filter *FilterConfig `json:"filter"`
Mounter *MounterConfig `json:"mounter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent,omitempty"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"`
SQLMode string `json:"sql_mode,omitempty"`
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand All @@ -206,6 +217,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
<<<<<<< HEAD
=======
res.IgnoreIneligibleTable = c.IgnoreIneligibleTable
res.SQLMode = c.SQLMode
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
if c.SyncPointInterval != nil {
res.SyncPointInterval = c.SyncPointInterval.duration
}
Expand Down Expand Up @@ -348,6 +364,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SyncPointInterval: &JSONDuration{cloned.SyncPointInterval},
SyncPointRetention: &JSONDuration{cloned.SyncPointRetention},
BDRMode: cloned.BDRMode,
SQLMode: cloned.SQLMode,
}

if cloned.Filter != nil {
Expand Down
18 changes: 18 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ var defaultAPIConfig = &ReplicaConfig{
Storage: "",
UseFileBackend: false,
},
<<<<<<< HEAD
=======
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: config.GetDefaultReplicaConfig().
Scheduler.EnableTableAcrossNodes,
RegionThreshold: config.GetDefaultReplicaConfig().
Scheduler.RegionThreshold,
WriteKeyThreshold: config.GetDefaultReplicaConfig().
Scheduler.WriteKeyThreshold,
},
Integrity: &IntegrityConfig{
IntegrityCheckLevel: config.GetDefaultReplicaConfig().Integrity.IntegrityCheckLevel,
CorruptionHandleLevel: config.GetDefaultReplicaConfig().Integrity.CorruptionHandleLevel,
},
ChangefeedErrorStuckDuration: &JSONDuration{*config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration},
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}

func TestDefaultReplicaConfig(t *testing.T) {
Expand Down
84 changes: 84 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,90 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.Consistent == nil {
info.Config.Consistent = defaultConfig.Consistent
}
<<<<<<< HEAD
=======
if info.Config.Scheduler == nil {
info.Config.Scheduler = defaultConfig.Scheduler
}

if info.Config.Integrity == nil {
info.Config.Integrity = defaultConfig.Integrity
}
if info.Config.ChangefeedErrorStuckDuration == nil {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
if info.Config.SQLMode == "" {
info.Config.SQLMode = defaultConfig.SQLMode
}
info.RmUnusedFields()
}

// RmUnusedFields removes unnecessary fields based on the downstream type and
// the protocol. Since we utilize a common changefeed configuration template,
// certain fields may not be utilized for certain protocols.
func (info *ChangeFeedInfo) RmUnusedFields() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn(
"failed to parse the sink uri",
zap.Error(err),
zap.Any("sinkUri", info.SinkURI),
)
return
}
if !sink.IsMQScheme(uri.Scheme) {
info.rmMQOnlyFields()
} else {
// remove schema registry for MQ downstream with
// protocol other than avro
if util.GetOrZero(info.Config.Sink.Protocol) != config.ProtocolAvro.String() {
info.Config.Sink.SchemaRegistry = nil
}
}

if !sink.IsStorageScheme(uri.Scheme) {
info.rmStorageOnlyFields()
}

if !sink.IsMySQLCompatibleScheme(uri.Scheme) {
info.rmDBOnlyFields()
} else {
// remove fields only being used by MQ and Storage downstream
info.Config.Sink.Protocol = nil
info.Config.Sink.Terminator = nil
}
}

func (info *ChangeFeedInfo) rmMQOnlyFields() {
log.Info("since the downstream is not a MQ, remove MQ only fields",
zap.String("namespace", info.Namespace),
zap.String("changefeed", info.ID))
info.Config.Sink.DispatchRules = nil
info.Config.Sink.SchemaRegistry = nil
info.Config.Sink.EncoderConcurrency = nil
info.Config.Sink.EnableKafkaSinkV2 = nil
info.Config.Sink.OnlyOutputUpdatedColumns = nil
info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil
info.Config.Sink.KafkaConfig = nil
}

func (info *ChangeFeedInfo) rmStorageOnlyFields() {
info.Config.Sink.CSVConfig = nil
info.Config.Sink.DateSeparator = nil
info.Config.Sink.EnablePartitionSeparator = nil
info.Config.Sink.FileIndexWidth = nil
info.Config.Sink.CloudStorageConfig = nil
}

func (info *ChangeFeedInfo) rmDBOnlyFields() {
info.Config.EnableSyncPoint = nil
info.Config.BDRMode = nil
info.Config.SyncPointInterval = nil
info.Config.SyncPointRetention = nil
info.Config.Consistent = nil
info.Config.Sink.SafeMode = nil
info.Config.Sink.MySQLConfig = nil
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down
17 changes: 16 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
<<<<<<< HEAD
"github.com/pingcap/tiflow/cdc/contextutil"
=======
"github.com/pingcap/tidb/parser/mysql"
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
"github.com/pingcap/tiflow/cdc/model"
sinkv1 "github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/mysql"
Expand Down Expand Up @@ -454,7 +458,18 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) {

// addSpecialComment translate tidb feature to comment
func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate)
p := parser.New()
// We need to use the correct SQL mode to parse the DDL query.
// Otherwise, the parser may fail to parse the DDL query.
// For example, it is needed to parse the following DDL query:
// `alter table "t" add column "c" int default 1;`
// by adding `ANSI_QUOTES` to the SQL mode.
mode, err := mysql.GetSQLMode(s.info.Config.SQLMode)
if err != nil {
return "", errors.Trace(err)
}
p.SetSQLMode(mode)
stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate)
if err != nil {
return "", errors.Trace(err)
}
Expand Down
16 changes: 15 additions & 1 deletion cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD
"github.com/pingcap/tiflow/cdc/sink"
=======
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/pkg/config"
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,7 +70,13 @@ func (m *mockSink) GetDDL() *model.DDLEvent {

func newDDLSink4Test(reportErr func(err error), reportWarn func(err error)) (DDLSink, *mockSink) {
mockSink := &mockSink{}
ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"), &model.ChangeFeedInfo{}, reportErr, reportWarn)
ddlSink := newDDLSink(
model.DefaultChangeFeedID("changefeed-test"),
&model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
},
reportErr,
reportWarn)
ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx context.Context, s *ddlSinkImpl) error {
s.sinkV1 = mockSink
return nil
Expand Down Expand Up @@ -487,6 +498,9 @@ func TestAddSpecialComment(t *testing.T) {
}

s := &ddlSinkImpl{}
s.info = &model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}
for _, ca := range testCase {
re, err := s.addSpecialComment(ca.event)
require.Nil(t, err)
Expand Down
46 changes: 46 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,21 @@ const (
"flush-interval": 2000,
"storage": "",
"use-file-backend": false
<<<<<<< HEAD
}
=======
},
"scheduler": {
"enable-table-across-nodes": false,
"region-threshold": 100000
},
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
},
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}`

testCfgTestServerConfigMarshal = `{
Expand Down Expand Up @@ -213,7 +227,24 @@ const (
"flush-interval": 2000,
"storage": "",
"use-file-backend": false
<<<<<<< HEAD
}
=======
},
"scheduler": {
"enable-table-across-nodes": true,
"region-per-span": 0,
"region-threshold": 100001,
"write-key-threshold": 100001,
"region-per-span": 0
},
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
},
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}`

testCfgTestReplicaConfigMarshal2 = `{
Expand Down Expand Up @@ -273,6 +304,21 @@ const (
"flush-interval": 2000,
"storage": "",
"use-file-backend": false
<<<<<<< HEAD
}
=======
},
"scheduler": {
"enable-table-across-nodes": true,
"region-threshold": 100001,
"write-key-threshold": 100001
},
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
},
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}`
)
66 changes: 66 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/pkg/config/outdated"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
Expand All @@ -33,7 +34,18 @@ const (
// minSyncPointInterval is the minimum of SyncPointInterval can be set.
minSyncPointInterval = time.Second * 30
// minSyncPointRetention is the minimum of SyncPointRetention can be set.
<<<<<<< HEAD
minSyncPointRetention = time.Hour * 1
=======
minSyncPointRetention = time.Hour * 1
minChangeFeedErrorStuckDuration = time.Minute * 30
// The default SQL Mode of TiDB: "ONLY_FULL_GROUP_BY,
// STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,
// NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
// Note: The SQL Mode of TiDB is not the same as ORACLE.
// If you want to use the same SQL Mode as ORACLE, you need to add "ORACLE" to the SQL Mode.
defaultSQLMode = mysql.DefaultSQLMode
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
)

var defaultReplicaConfig = &ReplicaConfig{
Expand Down Expand Up @@ -71,6 +83,20 @@ var defaultReplicaConfig = &ReplicaConfig{
Storage: "",
UseFileBackend: false,
},
<<<<<<< HEAD
=======
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
RegionThreshold: 100_000,
WriteKeyThreshold: 0,
},
Integrity: &integrity.Config{
IntegrityCheckLevel: integrity.CheckLevelNone,
CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn,
},
ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30),
SQLMode: defaultSQLMode,
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}

// GetDefaultReplicaConfig returns the default replica config.
Expand Down Expand Up @@ -103,13 +129,53 @@ type replicaConfig struct {
// BDR(Bidirectional Replication) is a feature that allows users to
// replicate data of same tables from TiDB-1 to TiDB-2 and vice versa.
// This feature is only available for TiDB.
<<<<<<< HEAD
BDRMode bool `toml:"bdr-mode" json:"bdr-mode"`
SyncPointInterval time.Duration `toml:"sync-point-interval" json:"sync-point-interval"`
SyncPointRetention time.Duration `toml:"sync-point-retention" json:"sync-point-retention"`
Filter *FilterConfig `toml:"filter" json:"filter"`
Mounter *MounterConfig `toml:"mounter" json:"mounter"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Consistent *ConsistentConfig `toml:"consistent" json:"consistent"`
=======
BDRMode *bool `toml:"bdr-mode" json:"bdr-mode,omitempty"`
// SyncPointInterval is only available when the downstream is DB.
SyncPointInterval *time.Duration `toml:"sync-point-interval" json:"sync-point-interval,omitempty"`
// SyncPointRetention is only available when the downstream is DB.
SyncPointRetention *time.Duration `toml:"sync-point-retention" json:"sync-point-retention,omitempty"`
Filter *FilterConfig `toml:"filter" json:"filter"`
Mounter *MounterConfig `toml:"mounter" json:"mounter"`
Sink *SinkConfig `toml:"sink" json:"sink"`
// Consistent is only available for DB downstream with redo feature enabled.
Consistent *ConsistentConfig `toml:"consistent" json:"consistent,omitempty"`
// Scheduler is the configuration for scheduler.
Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"`
// Integrity is only available when the downstream is MQ.
Integrity *integrity.Config `toml:"integrity" json:"integrity"`
ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"`
SQLMode string `toml:"sql-mode" json:"sql-mode"`
}

// Value implements the driver.Valuer interface
func (c ReplicaConfig) Value() (driver.Value, error) {
cfg, err := c.Marshal()
if err != nil {
return nil, err
}

// TODO: refactor the meaningless type conversion.
return []byte(cfg), nil
}

// Scan implements the sql.Scanner interface
func (c *ReplicaConfig) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}

return c.UnmarshalJSON(b)
>>>>>>> 365113c9cc (config, changefeed (ticdc): add sql mode config for changefeed (#9941))
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down
Loading

0 comments on commit ae4ee1e

Please sign in to comment.