diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 05a7ba86a2c..f65f307342b 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -270,6 +270,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( FlushWorkerNum: c.Consistent.FlushWorkerNum, Storage: c.Consistent.Storage, UseFileBackend: c.Consistent.UseFileBackend, +<<<<<<< HEAD +======= + Compression: c.Consistent.Compression, + FlushConcurrency: c.Consistent.FlushConcurrency, +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } } if c.Sink != nil { @@ -755,6 +760,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { FlushWorkerNum: c.Consistent.FlushWorkerNum, Storage: cloned.Consistent.Storage, UseFileBackend: cloned.Consistent.UseFileBackend, +<<<<<<< HEAD +======= + Compression: cloned.Consistent.Compression, + FlushConcurrency: cloned.Consistent.FlushConcurrency, +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } } if cloned.Mounter != nil { @@ -950,6 +960,11 @@ type ConsistentConfig struct { FlushWorkerNum int `json:"flush_worker_num"` Storage string `json:"storage,omitempty"` UseFileBackend bool `json:"use_file_backend"` +<<<<<<< HEAD +======= + Compression string `json:"compression,omitempty"` + FlushConcurrency int `json:"flush_concurrency,omitempty"` +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 3258bf169c4..76c61ead69b 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -170,7 +170,19 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { return errors.Trace(egCtx.Err()) case file := <-f.flushCh: start := time.Now() +<<<<<<< HEAD err := f.extStorage.WriteFile(egCtx, file.filename, file.data) +======= + if err := file.writer.Close(); err != nil { + return errors.Trace(err) + } + var err error + if f.cfg.FlushConcurrency <= 1 { + err = f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes()) + } else { + err = f.multiPartUpload(egCtx, file) + } +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) f.metricFlushAllDuration.Observe(time.Since(start).Seconds()) if err != nil { return errors.Trace(err) @@ -184,6 +196,19 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { } } +func (f *fileWorkerGroup) multiPartUpload(ctx context.Context, file *fileCache) error { + multipartWrite, err := f.extStorage.Create(ctx, file.filename, &storage.WriterOption{ + Concurrency: f.cfg.FlushConcurrency, + }) + if err != nil { + return errors.Trace(err) + } + if _, err = multipartWrite.Write(ctx, file.writer.buf.Bytes()); err != nil { + return errors.Trace(err) + } + return errors.Trace(multipartWrite.Close(ctx)) +} + func (f *fileWorkerGroup) bgWriteLogs( egCtx context.Context, inputCh <-chan *polymorphicRedoEvent, ) (err error) { diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 14021e2074a..32e2bed2352 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -32,6 +32,11 @@ type ConsistentConfig struct { FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"` Storage string `toml:"storage" json:"storage"` UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` +<<<<<<< HEAD +======= + Compression string `toml:"compression" json:"compression"` + FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 19be69c0126..d4786fb240a 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -196,8 +196,12 @@ func (s *extStorageWithTimeout) WalkDir( func (s *extStorageWithTimeout) Create( ctx context.Context, path string, option *storage.WriterOption, ) (storage.ExternalFileWriter, error) { - ctx, cancel := context.WithTimeout(ctx, s.timeout) - defer cancel() + if option.Concurrency <= 1 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + // multipart uploading spawns a background goroutine, can't set timeout return s.ExternalStorage.Create(ctx, path, option) } diff --git a/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml index 7edf7be7d69..ff99477c9da 100644 --- a/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml +++ b/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml @@ -1,3 +1,4 @@ [consistent] level = "eventual" storage = "s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" +flush-concurrency = 2