diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 05a7ba86a2c..8812ce6371f 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -430,10 +430,20 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ +<<<<<<< HEAD WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, FileSize: c.Sink.CloudStorageConfig.FileSize, OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, +======= + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) } } @@ -710,10 +720,20 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ +<<<<<<< HEAD WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, FileSize: cloned.Sink.CloudStorageConfig.FileSize, OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, +======= + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) } } @@ -1192,10 +1212,20 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { +<<<<<<< HEAD WorkerCount *int `json:"worker_count,omitempty"` FlushInterval *string `json:"flush_interval,omitempty"` FileSize *int `json:"file_size,omitempty"` OutputColumnID *bool `json:"output_column_id,omitempty"` +======= + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 7804c4497b2..8ca5b66ffc9 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -319,12 +319,14 @@ func TestGCAndCleanup(t *testing.T) { cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) - m.Cleanup(ctx) - ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID)) + cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) + defer cleanupCancel() + m.Cleanup(cleanupCtx) + ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID)) require.NoError(t, err) require.True(t, ret) cnt := 0 - extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error { cnt++ return nil }) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 7af258f4d53..00e7c1d6748 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -19,7 +19,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" @@ -27,6 +26,7 @@ import ( mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/prometheus/client_golang/prometheus" @@ -237,12 +237,47 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single callbacks = append(callbacks, msg.Callback) } +<<<<<<< HEAD if err := d.statistics.RecordBatchExecution(func() (int, error) { err := d.storage.WriteFile(ctx, path, buf.Bytes()) if err != nil { return 0, err } return rowsCnt, nil +======= + if err := d.statistics.RecordBatchExecution(func() (int, int64, error) { + start := time.Now() + defer d.metricFlushDuration.Observe(time.Since(start).Seconds()) + + if d.config.FlushConcurrency <= 1 { + return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + } + + writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ + Concurrency: d.config.FlushConcurrency, + }) + if inErr != nil { + return 0, 0, inErr + } + + defer func() { + closeErr := writer.Close(ctx) + if inErr != nil { + log.Error("failed to close writer", zap.Error(closeErr), + zap.Int("workerID", d.id), + zap.Any("table", task.tableInfo.TableName), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID)) + if inErr == nil { + inErr = closeErr + } + } + }() + if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { + return 0, 0, inErr + } + return rowsCnt, bytesCnt, nil +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) }); err != nil { return err } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index da1174fe1e1..00d267f6995 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -574,7 +574,14 @@ type CloudStorageConfig struct { FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` +<<<<<<< HEAD OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` +======= + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 49d159330f5..3261ad777aa 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -42,6 +42,12 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size @@ -65,6 +71,7 @@ type Config struct { DateSeparator string EnablePartitionSeparator bool OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. @@ -117,11 +124,24 @@ func (c *Config) Apply( c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) if replicaConfig.Sink.CloudStorageConfig != nil { c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) +<<<<<<< HEAD +======= + if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil { + c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays + } + if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { + c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec + } + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 863b5c648c4..5703ed0d4f7 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) { expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorDay.String() expected.EnablePartitionSeparator = true + expected.FlushConcurrency = 1 uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err)