Skip to content

Commit

Permalink
dm: add strict optimist mode (#9113) (#9540)
Browse files Browse the repository at this point in the history
close #9112
  • Loading branch information
ti-chi-bot authored Oct 16, 2023
1 parent 69e9d10 commit ff869a6
Show file tree
Hide file tree
Showing 22 changed files with 754 additions and 548 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=inter
ErrConfigInvalidPhysicalChecksum,[code=20063:class=config:scope=internal:level=medium], "Message: invalid load checksum-physical option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrConfigColumnMappingDeprecated,[code=20064:class=config:scope=internal:level=high], "Message: column-mapping is not supported since v6.6.0, Workaround: Please use extract-table/extract-schema/extract-source to handle data conflict when merge tables. See https://docs.pingcap.com/tidb/v6.4/task-configuration-file-full#task-configuration-file-template-advanced"
ErrConfigInvalidLoadAnalyze,[code=20065:class=config:scope=internal:level=medium], "Message: invalid load analyze option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrConfigStrictOptimisticShardMode,[code=20066:class=config:scope=internal:level=medium], "Message: cannot enable `strict-optimistic-shard-mode` while `shard-mode` is not `optimistic`, Workaround: Please set `shard-mode` to `optimistic` if you want to enable `strict-optimistic-shard-mode`."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
10 changes: 7 additions & 3 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ type SubTaskConfig struct {
flagSet *flag.FlagSet

// when in sharding, multi dm-workers do one task
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
StrictOptimisticShardMode bool `toml:"strict-optimistic-shard-mode" json:"strict-optimistic-shard-mode"`
OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`

// pt/gh-ost name rule, support regex
ShadowTableRules []string `yaml:"shadow-table-rules" toml:"shadow-table-rules" json:"shadow-table-rules"`
Expand Down Expand Up @@ -288,6 +289,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
} else if c.ShardMode == "" && c.IsSharding {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}
if c.StrictOptimisticShardMode && c.ShardMode != ShardOptimistic {
return terror.ErrConfigStrictOptimisticShardMode.Generate()
}

if len(c.ColumnMappingRules) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
Expand Down
82 changes: 44 additions & 38 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,11 @@ func defaultValidatorConfig() ValidatorConfig {
type TaskConfig struct {
*flag.FlagSet `yaml:"-" toml:"-" json:"-"`

Name string `yaml:"name" toml:"name" json:"name"`
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
Name string `yaml:"name" toml:"name" json:"name"`
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
StrictOptimisticShardMode bool `yaml:"strict-optimistic-shard-mode" toml:"strict-optimistic-shard-mode" json:"strict-optimistic-shard-mode"`
// treat it as hidden configuration
IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"`
// we store detail status in meta
Expand Down Expand Up @@ -682,6 +683,9 @@ func (c *TaskConfig) adjust() error {
} else if c.ShardMode == "" && c.IsSharding {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}
if c.StrictOptimisticShardMode && c.ShardMode != ShardOptimistic {
return terror.ErrConfigStrictOptimisticShardMode.Generate()
}

if len(c.ColumnMappings) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
Expand Down Expand Up @@ -1237,45 +1241,47 @@ type TaskConfigForDowngrade struct {
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
// new config item
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
StrictOptimisticShardMode bool `yaml:"strict-optimistic-shard-mode,omitempty"`
}

// NewTaskConfigForDowngrade create new TaskConfigForDowngrade.
func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
return &TaskConfigForDowngrade{
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: NewLoaderConfigForDowngrade(taskConfig.Loaders),
Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
StrictOptimisticShardMode: taskConfig.StrictOptimisticShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: NewLoaderConfigForDowngrade(taskConfig.Loaders),
Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
}
}

Expand Down
6 changes: 6 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]dbconfig.DBCon
cfg := NewSubTaskConfig()
cfg.IsSharding = c.IsSharding
cfg.ShardMode = c.ShardMode
cfg.StrictOptimisticShardMode = c.StrictOptimisticShardMode
cfg.OnlineDDL = c.OnlineDDL
cfg.TrashTableRules = c.TrashTableRules
cfg.ShadowTableRules = c.ShadowTableRules
Expand Down Expand Up @@ -181,6 +182,9 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig,
} else {
subTaskCfg.IsSharding = false
}
if task.StrictOptimisticShardMode != nil {
subTaskCfg.StrictOptimisticShardMode = *task.StrictOptimisticShardMode
}
// set online ddl plugin config
subTaskCfg.OnlineDDL = task.EnhanceOnlineSchemaChange
// set case sensitive from source
Expand Down Expand Up @@ -314,6 +318,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.TaskMode = stCfg0.Mode
c.IsSharding = stCfg0.IsSharding
c.ShardMode = stCfg0.ShardMode
c.StrictOptimisticShardMode = stCfg0.StrictOptimisticShardMode
c.IgnoreCheckingItems = stCfg0.IgnoreCheckingItems
c.MetaSchema = stCfg0.MetaSchema
c.EnableHeartbeat = stCfg0.EnableHeartbeat
Expand Down Expand Up @@ -611,6 +616,7 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
taskShardMode := openapi.TaskShardMode(oneSubtaskConfig.ShardMode)
task.ShardMode = &taskShardMode
}
task.StrictOptimisticShardMode = &oneSubtaskConfig.StrictOptimisticShardMode
if len(filterMap) > 0 {
task.BinlogFilterRule = &filterRuleMap
}
Expand Down
5 changes: 3 additions & 2 deletions dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTask1Config.Meta.BinLogPos, check.Equals, uint32(*task.SourceConfig.SourceConf[0].BinlogPos))

// check shard config
c.Assert(subTask1Config.ShardMode, check.Equals, string(openapi.TaskShardModePessimistic))
c.Assert(subTask1Config.ShardMode, check.Equals, string(openapi.TaskShardModeOptimistic))
c.Assert(subTask1Config.StrictOptimisticShardMode, check.IsTrue)
// check online schema change
c.Assert(subTask1Config.OnlineDDL, check.Equals, true)
// check case sensitive
Expand Down Expand Up @@ -218,7 +219,7 @@ func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTask2Config.Meta.BinLogName, check.Equals, *task.SourceConfig.SourceConf[1].BinlogName)
c.Assert(subTask2Config.Meta.BinLogPos, check.Equals, uint32(*task.SourceConfig.SourceConf[1].BinlogPos))
// check shard config
c.Assert(subTask2Config.ShardMode, check.Equals, string(openapi.TaskShardModePessimistic))
c.Assert(subTask2Config.ShardMode, check.Equals, string(openapi.TaskShardModeOptimistic))
// check online schema change
c.Assert(subTask2Config.OnlineDDL, check.Equals, true)
// check case sensitive
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,12 @@ description = ""
workaround = "Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20066]
message = "cannot enable `strict-optimistic-shard-mode` while `shard-mode` is not `optimistic`"
description = ""
workaround = "Please set `shard-mode` to `optimistic` if you want to enable `strict-optimistic-shard-mode`."
tags = ["internal", "medium"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
6 changes: 4 additions & 2 deletions dm/openapi/fixtures/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ var (
"security": null,
"user": "root"
},
"task_mode": "all"
"task_mode": "all",
"strict_optimistic_shard_mode": false
}
`

Expand Down Expand Up @@ -103,7 +104,8 @@ var (
"meta_schema": "dm_meta",
"name": "test",
"on_duplicate": "error",
"shard_mode": "pessimistic",
"shard_mode": "optimistic",
"strict_optimistic_shard_mode": true,
"source_config": {
"full_migrate_conf": {
"data_dir": "./exported_data",
Expand Down
Loading

0 comments on commit ff869a6

Please sign in to comment.