Skip to content

Commit

Permalink
add config
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 26, 2023
1 parent 587b155 commit bfca379
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 14 deletions.
56 changes: 43 additions & 13 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
LargeMessageHandle: largeMessageHandle,
}
}

if c.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
}
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -465,6 +475,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
LargeMessageHandle: largeMessageHandle,
}
}

if cloned.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
}
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -586,19 +606,20 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
KafkaConfig *KafkaConfig `json:"kafka_config"`
AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"`
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
KafkaConfig *KafkaConfig `json:"kafka_config"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"`
}

// KafkaConfig represents kafka config for a changefeed.
Expand All @@ -615,6 +636,15 @@ type KafkaConfig struct {
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
}

// CSVConfig denotes the csv config
// This is the same as config.CSVConfig
type CSVConfig struct {
Expand Down
13 changes: 12 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ type SinkConfig struct {
EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"`
FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"`

KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"`
KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"`
CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"`

// TiDBSourceID is the source ID of the upstream TiDB,
// which is used to set the `tidb_cdc_write_source` session variable.
Expand Down Expand Up @@ -542,3 +543,13 @@ func (c *LargeMessageHandleConfig) Disabled() bool {
}
return c.LargeMessageHandleOption == LargeMessageHandleOptionNone
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"`
FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"`
FileSize *int `toml:"file-size" json:"file-size,omitempty"`

FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
}
16 changes: 16 additions & 0 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
psink "github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand All @@ -39,6 +40,12 @@ const (
minFlushInterval = 2 * time.Second
// the upper limit of flush-interval.
maxFlushInterval = 10 * time.Minute
// defaultFlushConcurrency is the default value of flush-concurrency.
defaultFlushConcurrency = 1
// the lower limit of flush-concurrency.
minFlushConcurrency = 1
// the upper limit of flush-concurrency.
maxFlushConcurrency = 512
// defaultFileSize is the default value of file-size.
defaultFileSize = 64 * 1024 * 1024
// the lower limit of file size
Expand All @@ -55,6 +62,8 @@ type Config struct {
FileIndexWidth int
DateSeparator string
EnablePartitionSeparator bool
OutputColumnID bool
FlushConcurrency int
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -98,10 +107,17 @@ func (c *Config) Apply(
c.DateSeparator = replicaConfig.Sink.DateSeparator
c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator
c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth
if replicaConfig.Sink.CloudStorageConfig != nil {
c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID)
c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency)
}

if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {
c.FileIndexWidth = config.DefaultFileIndexWidth
}
if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency {
c.FlushConcurrency = defaultFlushConcurrency
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestConfigApply(t *testing.T) {
expected.FileIndexWidth = config.DefaultFileIndexWidth
expected.DateSeparator = config.DateSeparatorNone.String()
expected.EnablePartitionSeparator = true
expected.FlushConcurrency = 1
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
Expand Down
27 changes: 27 additions & 0 deletions pkg/util/pointer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

// GetOrZero returns the value pointed to by p, or a zero value of
// its type if p is nil.
func GetOrZero[T any](p *T) T {
var val T
if p == nil {
return val
}
return *p
}

// AddressOf return the address of the given input variable.
func AddressOf[T any](v T) *T { return &v }

0 comments on commit bfca379

Please sign in to comment.