From 4d7324d0b8ba3195cebc1ea795d2ef9a941061e7 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 29 Nov 2023 14:58:18 +0800 Subject: [PATCH] sink(ticdc): use multi part s3 uploader in storage sink (#9954) close pingcap/tiflow#10098, close pingcap/tiflow#10172 --- cdc/api/v2/model.go | 3 +++ cdc/redo/meta_manager_test.go | 8 +++--- cdc/sink/dmlsink/cloudstorage/dml_worker.go | 29 ++++++++++++++++++--- pkg/config/sink.go | 1 + pkg/sink/cloudstorage/config.go | 11 ++++++++ pkg/sink/cloudstorage/config_test.go | 1 + 6 files changed, 47 insertions(+), 6 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 8f3628238d1..e32ca7d7f69 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -454,6 +454,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, } } @@ -742,6 +743,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, } } @@ -1261,6 +1263,7 @@ type CloudStorageConfig struct { OutputColumnID *bool `json:"output_column_id,omitempty"` FileExpirationDays *int `json:"file_expiration_days,omitempty"` FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 7804c4497b2..8ca5b66ffc9 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -319,12 +319,14 @@ func TestGCAndCleanup(t *testing.T) { cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) - m.Cleanup(ctx) - ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID)) + cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) + defer cleanupCancel() + m.Cleanup(cleanupCtx) + ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID)) require.NoError(t, err) require.True(t, ret) cnt := 0 - extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error { cnt++ return nil }) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 048b3d317a9..e6970cf6c9c 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -239,9 +239,32 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single } if err := d.statistics.RecordBatchExecution(func() (int, int64, error) { - err := d.storage.WriteFile(ctx, path, buf.Bytes()) - if err != nil { - return 0, 0, err + if d.config.FlushConcurrency <= 1 { + return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + } + + writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ + Concurrency: d.config.FlushConcurrency, + }) + if inErr != nil { + return 0, 0, inErr + } + + defer func() { + closeErr := writer.Close(ctx) + if inErr != nil { + log.Error("failed to close writer", zap.Error(closeErr), + zap.Int("workerID", d.id), + zap.Any("table", task.tableInfo.TableName), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID)) + if inErr == nil { + inErr = closeErr + } + } + }() + if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { + return 0, 0, inErr } return rowsCnt, bytesCnt, nil }); err != nil { diff --git a/pkg/config/sink.go b/pkg/config/sink.go index cc6bb9877b5..80f4d87731a 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -593,6 +593,7 @@ type CloudStorageConfig struct { OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 9ed6ca04c4b..628e0d77ecf 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -42,6 +42,12 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size @@ -73,6 +79,7 @@ type Config struct { FileCleanupCronSpec string EnablePartitionSeparator bool OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. @@ -133,11 +140,15 @@ func (c *Config) Apply( if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec } + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 863b5c648c4..5703ed0d4f7 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) { expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorDay.String() expected.EnablePartitionSeparator = true + expected.FlushConcurrency = 1 uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err)