Skip to content

Commit

Permalink
support config file.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 20, 2023
1 parent 0f22efc commit 25c9e42
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 7 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: c.Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: c.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
PulsarConfig: pulsarConfig,
Expand Down Expand Up @@ -731,6 +732,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: cloned.Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: cloned.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
PulsarConfig: pulsarConfig,
Expand Down Expand Up @@ -896,6 +898,7 @@ type SinkConfig struct {
EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty"`
DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns"`
ContentCompatible *bool `json:"content_compatible"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
PulsarConfig *PulsarConfig `json:"pulsar_config,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (info *ChangeFeedInfo) rmMQOnlyFields() {
info.Config.Sink.EnableKafkaSinkV2 = nil
info.Config.Sink.OnlyOutputUpdatedColumns = nil
info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil
info.Config.Sink.ContentCompatible = nil
info.Config.Sink.KafkaConfig = nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
Protocol: util.AddressOf("open-protocol"),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
}, cfg.Sink)
Expand Down Expand Up @@ -246,6 +247,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
},
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
}, cfg.Sink)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ const (
}
],
"enable-partition-separator": true,
"protocol": "open-protocol",
"protocol": "canal-json",
"enable-kafka-sink-v2": false,
"only-output-updated-columns": false,
"delete-only-output-handle-key-columns": false,
"content-compatible": false,
"large-message-handle": {
"large-message-handle-option": "none",
"large-message-handle-compression": "",
Expand Down Expand Up @@ -199,7 +200,7 @@ const (
},
"sink": {
"encoder-concurrency": 32,
"protocol": "open-protocol",
"protocol": "canal-json",
"column-selectors": [
{
"matcher": [
Expand All @@ -223,6 +224,7 @@ const (
"enable-kafka-sink-v2": true,
"only-output-updated-columns": true,
"delete-only-output-handle-key-columns": true,
"content-compatible": true,
"safe-mode": true,
"terminator": "\r\n",
"transaction-atomicity": "",
Expand Down Expand Up @@ -350,7 +352,7 @@ const (
"sink": {
"encoder-concurrency": 32,
"dispatchers": null,
"protocol": "open-protocol",
"protocol": "canal-json",
"column-selectors": [
{
"matcher": [
Expand All @@ -376,6 +378,7 @@ const (
"enable-kafka-sink-v2": true,
"only-output-updated-columns": true,
"delete-only-output-handle-key-columns": true,
"content-compatible": true,
"safe-mode": true,
"kafka-config": {
"partition-num": 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var defaultReplicaConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
TiDBSourceID: 1,
AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec),
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
conf.ForceReplicate = true
conf.Filter.Rules = []string{"1.1"}
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = util.AddressOf("open-protocol")
conf.Sink.Protocol = util.AddressOf("canal-json")
conf.Sink.ColumnSelectors = []*ColumnSelector{
{
Matcher: []string{"1.1"},
Expand All @@ -66,6 +66,7 @@ func TestReplicaConfigMarshal(t *testing.T) {

conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true)
conf.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true)
conf.Sink.ContentCompatible = aws.Bool(true)
conf.Sink.SafeMode = aws.Bool(true)
conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(150))
conf.Sink.KafkaConfig = &KafkaConfig{
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
conf.ForceReplicate = true
conf.Filter.Rules = []string{"1.1"}
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = util.AddressOf("open-protocol")
conf.Sink.Protocol = util.AddressOf("canal-json")
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "r1"},
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type SinkConfig struct {
// DeleteOnlyOutputHandleKeyColumns is only available when the downstream is MQ.
DeleteOnlyOutputHandleKeyColumns *bool `toml:"delete-only-output-handle-key-columns" json:"delete-only-output-handle-key-columns,omitempty"`

// ContentCompatible is only available when the downstream is MQ.
ContentCompatible *bool `toml:"content-compatible" json:"content-compatible,omitempty"`

// TiDBSourceID is the source ID of the upstream TiDB,
// which is used to set the `tidb_cdc_write_source` session variable.
// Note: This field is only used internally and only used in the MySQL sink.
Expand Down
7 changes: 5 additions & 2 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ type urlConfig struct {

AvroSchemaRegistry string `form:"schema-registry"`
OnlyOutputUpdatedColumns *bool `form:"only-output-updated-columns"`

ContentCompatible *bool `form:"content-compatible"`
ContentCompatible *bool `form:"content-compatible"`
}

// Apply fill the Config
Expand Down Expand Up @@ -234,6 +233,10 @@ func mergeConfig(
if replicaConfig.Sink != nil {
dest.AvroSchemaRegistry = util.GetOrZero(replicaConfig.Sink.SchemaRegistry)
dest.OnlyOutputUpdatedColumns = replicaConfig.Sink.OnlyOutputUpdatedColumns
dest.ContentCompatible = replicaConfig.Sink.ContentCompatible
if util.GetOrZero(dest.ContentCompatible) {
dest.OnlyOutputUpdatedColumns = util.AddressOf(true)
}
if replicaConfig.Sink.KafkaConfig != nil {
dest.MaxMessageBytes = replicaConfig.Sink.KafkaConfig.MaxMessageBytes
if replicaConfig.Sink.KafkaConfig.CodecConfig != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/sink/codec/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func TestMergeConfig(t *testing.T) {
require.Equal(t, "abc", c.AvroConfluentSchemaRegistry)
require.True(t, c.OnlyOutputUpdatedColumns)
require.True(t, c.AvroEnableWatermark)
require.False(t, c.ContentCompatible)
require.Equal(t, "ab", c.AvroBigintUnsignedHandlingMode)
require.Equal(t, "cd", c.AvroDecimalHandlingMode)
require.Equal(t, 123, c.MaxMessageBytes)
Expand Down Expand Up @@ -413,6 +414,7 @@ func TestMergeConfig(t *testing.T) {
require.Equal(t, "abc", c.AvroConfluentSchemaRegistry)
require.True(t, c.OnlyOutputUpdatedColumns)
require.True(t, c.AvroEnableWatermark)
require.False(t, c.ContentCompatible)
require.Equal(t, "ab", c.AvroBigintUnsignedHandlingMode)
require.Equal(t, "cd", c.AvroDecimalHandlingMode)
require.Equal(t, 123, c.MaxMessageBytes)
Expand Down Expand Up @@ -451,11 +453,23 @@ func TestMergeConfig(t *testing.T) {
require.Equal(t, "abc", c.AvroConfluentSchemaRegistry)
require.True(t, c.OnlyOutputUpdatedColumns)
require.True(t, c.AvroEnableWatermark)
require.False(t, c.ContentCompatible)
require.Equal(t, "ab", c.AvroBigintUnsignedHandlingMode)
require.Equal(t, "cd", c.AvroDecimalHandlingMode)
require.Equal(t, 123, c.MaxMessageBytes)
require.Equal(t, 456, c.MaxBatchSize)
require.Equal(t, c.LargeMessageHandle.LargeMessageHandleOption, config.LargeMessageHandleOptionClaimCheck)

replicaConfig = config.GetDefaultReplicaConfig()
replicaConfig.Sink.ContentCompatible = aws.Bool(true)
uri = "kafka://127.0.0.1:9092/content-compatible?protocol=canal-json"
sinkURI, err = url.Parse(uri)
require.NoError(t, err)
c = NewConfig(config.ProtocolCanalJSON)
err = c.Apply(sinkURI, replicaConfig)
require.NoError(t, err)
require.True(t, c.ContentCompatible)
require.True(t, c.OnlyOutputUpdatedColumns)
}

func TestApplyConfig4CanalJSON(t *testing.T) {
Expand Down

0 comments on commit 25c9e42

Please sign in to comment.