Skip to content

Commit

Permalink
add config
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Oct 25, 2023
1 parent 955999f commit 2ef4bc2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 24 deletions.
27 changes: 15 additions & 12 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var cloudStorageConfig *config.CloudStorageConfig
if c.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
}
}

Expand Down Expand Up @@ -704,10 +705,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var cloudStorageConfig *CloudStorageConfig
if cloned.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
}
}

Expand Down Expand Up @@ -1180,10 +1182,11 @@ type MySQLConfig struct {

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
26 changes: 15 additions & 11 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,26 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
callbacks = append(callbacks, msg.Callback)
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
batchUploader, err := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: 16,
if err := d.statistics.RecordBatchExecution(func() (_ int, inErr error) {
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if err != nil {
return 0, err
if inErr != nil {
return 0, inErr
}
defer func() {
err := batchUploader.Close(ctx)
if err != nil {
log.Error("failed to close batch uploader", zap.Error(err))
if inErr == nil {
inErr = writer.Close(ctx)
}
// TODO: maybe we should abort the MultipartUpload here.
}()
_, err = batchUploader.Write(ctx, buf.Bytes())
if err != nil {
return 0, err

if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
}
return rowsCnt, nil
}); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ type CloudStorageConfig struct {
FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"`
FileSize *int `toml:"file-size" json:"file-size,omitempty"`

OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ const (
minFlushInterval = 2 * time.Second
// the upper limit of flush-interval.
maxFlushInterval = 10 * time.Minute
// defaultFlushConcurrency is the default value of flush-concurrency.
defaultFlushConcurrency = 1
// the lower limit of flush-concurrency.
minFlushConcurrency = 1
// the upper limit of flush-concurrency.
maxFlushConcurrency = 512
// defaultFileSize is the default value of file-size.
defaultFileSize = 64 * 1024 * 1024
// the lower limit of file size
Expand All @@ -65,6 +71,7 @@ type Config struct {
DateSeparator string
EnablePartitionSeparator bool
OutputColumnID bool
FlushConcurrency int
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -117,11 +124,15 @@ func (c *Config) Apply(
c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth)
if replicaConfig.Sink.CloudStorageConfig != nil {
c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID)
c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency)
}

if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {
c.FileIndexWidth = config.DefaultFileIndexWidth
}
if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency {
c.FlushConcurrency = defaultFlushConcurrency
}

return nil
}
Expand Down

0 comments on commit 2ef4bc2

Please sign in to comment.