From 487cd7dc731fa3d08c2f9ac04d6cef14ef7d6d0d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 23 Oct 2023 17:56:31 +0800 Subject: [PATCH] use multi uploader --- cdc/api/v2/model.go | 27 +++++++++-------- cdc/sink/dmlsink/cloudstorage/dml_worker.go | 33 +++++++++++++++++---- pkg/config/sink.go | 3 +- pkg/sink/cloudstorage/config.go | 11 +++++++ pkg/sink/cloudstorage/config_test.go | 1 + 5 files changed, 57 insertions(+), 18 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 05a7ba86a2c..ccaea97104f 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -430,10 +430,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, } } @@ -710,10 +711,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, } } @@ -1192,10 +1194,11 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 7af258f4d53..d3833f2913a 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -19,7 +19,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" @@ -27,6 +26,7 @@ import ( mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/prometheus/client_golang/prometheus" @@ -237,10 +237,33 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single callbacks = append(callbacks, msg.Callback) } - if err := d.statistics.RecordBatchExecution(func() (int, error) { - err := d.storage.WriteFile(ctx, path, buf.Bytes()) - if err != nil { - return 0, err + if err := d.statistics.RecordBatchExecution(func() (_ int, inErr error) { + if d.config.FlushConcurrency <= 1 { + return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + } + + writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ + Concurrency: d.config.FlushConcurrency, + }) + if inErr != nil { + return 0, inErr + } + + defer func() { + closeErr := writer.Close(ctx) + if inErr != nil { + log.Error("failed to close writer", zap.Error(closeErr), + zap.Int("workerID", d.id), + zap.Any("table", task.tableInfo.TableName), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID)) + if inErr == nil { + inErr = closeErr + } + } + }() + if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { + return 0, inErr } return rowsCnt, nil }); err != nil { diff --git a/pkg/config/sink.go b/pkg/config/sink.go index da1174fe1e1..f294256b31a 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -574,7 +574,8 @@ type CloudStorageConfig struct { FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` - OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 49d159330f5..ca7222520a2 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -42,6 +42,12 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size @@ -65,6 +71,7 @@ type Config struct { DateSeparator string EnablePartitionSeparator bool OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. @@ -117,11 +124,15 @@ func (c *Config) Apply( c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) if replicaConfig.Sink.CloudStorageConfig != nil { c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 863b5c648c4..5703ed0d4f7 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) { expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorDay.String() expected.EnablePartitionSeparator = true + expected.FlushConcurrency = 1 uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err)