diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 267e5b6cef0..27c7e12aa4f 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -336,6 +336,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( LargeMessageHandle: largeMessageHandle, } } + + if c.Sink.CloudStorageConfig != nil { + res.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + } + } } if c.Mounter != nil { res.Mounter = &config.MounterConfig{ @@ -465,6 +475,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { LargeMessageHandle: largeMessageHandle, } } + + if cloned.Sink.CloudStorageConfig != nil { + res.Sink.CloudStorageConfig = &CloudStorageConfig{ + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + } + } } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ @@ -586,19 +606,20 @@ type Table struct { // SinkConfig represents sink config for a changefeed // This is a duplicate of config.SinkConfig type SinkConfig struct { - Protocol string `json:"protocol"` - SchemaRegistry string `json:"schema_registry"` - CSVConfig *CSVConfig `json:"csv"` - DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` - ColumnSelectors []*ColumnSelector `json:"column_selectors"` - TxnAtomicity string `json:"transaction_atomicity"` - EncoderConcurrency int `json:"encoder_concurrency"` - Terminator string `json:"terminator"` - DateSeparator string `json:"date_separator"` - EnablePartitionSeparator bool `json:"enable_partition_separator"` - FileIndexWidth int `json:"file_index_width"` - KafkaConfig *KafkaConfig `json:"kafka_config"` - AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"` + Protocol string `json:"protocol"` + SchemaRegistry string `json:"schema_registry"` + CSVConfig *CSVConfig `json:"csv"` + DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` + ColumnSelectors []*ColumnSelector `json:"column_selectors"` + TxnAtomicity string `json:"transaction_atomicity"` + EncoderConcurrency int `json:"encoder_concurrency"` + Terminator string `json:"terminator"` + DateSeparator string `json:"date_separator"` + EnablePartitionSeparator bool `json:"enable_partition_separator"` + FileIndexWidth int `json:"file_index_width"` + KafkaConfig *KafkaConfig `json:"kafka_config"` + CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` + AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"` } // KafkaConfig represents kafka config for a changefeed. @@ -615,6 +636,15 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` } +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` +} + // CSVConfig denotes the csv config // This is the same as config.CSVConfig type CSVConfig struct { diff --git a/pkg/config/sink.go b/pkg/config/sink.go index a796f8372ac..43f90e36ce9 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -131,7 +131,8 @@ type SinkConfig struct { EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"` FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"` - KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` + KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` + CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` // TiDBSourceID is the source ID of the upstream TiDB, // which is used to set the `tidb_cdc_write_source` session variable. @@ -542,3 +543,13 @@ func (c *LargeMessageHandleConfig) Disabled() bool { } return c.LargeMessageHandleOption == LargeMessageHandleOptionNone } + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` + FileSize *int `toml:"file-size" json:"file-size,omitempty"` + + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` +} diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 75cd00e186b..a8a28f7a2d7 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" psink "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -39,6 +40,12 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size @@ -55,6 +62,8 @@ type Config struct { FileIndexWidth int DateSeparator string EnablePartitionSeparator bool + OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. @@ -98,10 +107,17 @@ func (c *Config) Apply( c.DateSeparator = replicaConfig.Sink.DateSeparator c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth + if replicaConfig.Sink.CloudStorageConfig != nil { + c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) + } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 994219c74b5..906131965f6 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -31,6 +31,7 @@ func TestConfigApply(t *testing.T) { expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorNone.String() expected.EnablePartitionSeparator = true + expected.FlushConcurrency = 1 uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err) diff --git a/pkg/util/pointer.go b/pkg/util/pointer.go new file mode 100644 index 00000000000..c047548b88f --- /dev/null +++ b/pkg/util/pointer.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +// GetOrZero returns the value pointed to by p, or a zero value of +// its type if p is nil. +func GetOrZero[T any](p *T) T { + var val T + if p == nil { + return val + } + return *p +} + +// AddressOf return the address of the given input variable. +func AddressOf[T any](v T) *T { return &v }