diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index af9887b0240..5d9cfc6a39a 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -425,10 +425,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, } } @@ -704,10 +705,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, } } @@ -1180,10 +1182,11 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index ac536bb889c..0390b3ce749 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -237,22 +237,26 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single callbacks = append(callbacks, msg.Callback) } - if err := d.statistics.RecordBatchExecution(func() (int, error) { - batchUploader, err := d.storage.Create(ctx, path, &storage.WriterOption{ - Concurrency: 16, + if err := d.statistics.RecordBatchExecution(func() (_ int, inErr error) { + if d.config.FlushConcurrency <= 1 { + return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + } + + writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ + Concurrency: d.config.FlushConcurrency, }) - if err != nil { - return 0, err + if inErr != nil { + return 0, inErr } defer func() { - err := batchUploader.Close(ctx) - if err != nil { - log.Error("failed to close batch uploader", zap.Error(err)) + if inErr == nil { + inErr = writer.Close(ctx) } + // TODO: maybe we should abort the MultipartUpload here. }() - _, err = batchUploader.Write(ctx, buf.Bytes()) - if err != nil { - return 0, err + + if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { + return 0, inErr } return rowsCnt, nil }); err != nil { diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 44d59590db5..6b443bdf073 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -574,7 +574,8 @@ type CloudStorageConfig struct { FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` - OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 49d159330f5..ca7222520a2 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -42,6 +42,12 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size @@ -65,6 +71,7 @@ type Config struct { DateSeparator string EnablePartitionSeparator bool OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. @@ -117,11 +124,15 @@ func (c *Config) Apply( c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) if replicaConfig.Sink.CloudStorageConfig != nil { c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil }