Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.1' into cherry-pick-…
Browse files Browse the repository at this point in the history
…9813-to-release-7.1
  • Loading branch information
asddongmen committed Dec 7, 2023
2 parents 5ae1616 + de95a8e commit 27b4ba0
Show file tree
Hide file tree
Showing 100 changed files with 4,309 additions and 1,379 deletions.
45 changes: 30 additions & 15 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,15 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
EncodingWorkerNum: c.Consistent.EncodingWorkerNum,
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Compression: c.Consistent.Compression,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -397,6 +401,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
ContentCompatible: c.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -616,6 +621,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
ContentCompatible: cloned.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand All @@ -627,11 +633,15 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
EncodingWorkerNum: c.Consistent.EncodingWorkerNum,
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Compression: cloned.Consistent.Compression,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -773,6 +783,7 @@ type SinkConfig struct {
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
ContentCompatible *bool `json:"content_compatible"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
Expand Down Expand Up @@ -807,11 +818,15 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
EncodingWorkerNum int `json:"encoding_worker_num"`
FlushWorkerNum int `json:"flush_worker_num"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
Compression string `json:"compression,omitempty"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
15 changes: 9 additions & 6 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// note: this is api published default value, not change it
var defaultAPIConfig = &ReplicaConfig{
MemoryQuota: config.DefaultChangefeedMemoryQuota,
CaseSensitive: true,
CaseSensitive: false,
EnableOldValue: true,
CheckGCSafePoint: true,
EnableSyncPoint: false,
Expand All @@ -58,11 +58,14 @@ var defaultAPIConfig = &ReplicaConfig{
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: config.GetDefaultReplicaConfig().
Expand Down
Loading

0 comments on commit 27b4ba0

Please sign in to comment.