From 8f921527fd8bc9815b9aac0e70597cc4ec7f7598 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Sat, 2 Dec 2023 14:22:19 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #10227 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 39 +++++++++++++++++++ cdc/redo/writer/memory/file_worker.go | 25 ++++++++++++ pkg/config/consistent.go | 13 +++++++ pkg/util/external_storage.go | 10 +++++ .../conf/changefeed.toml | 1 + 5 files changed, 88 insertions(+) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3206f2f1899..f15a1ae1f77 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -260,11 +260,24 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( } if c.Consistent != nil { res.Consistent = &config.ConsistentConfig{ +<<<<<<< HEAD Level: c.Consistent.Level, MaxLogSize: c.Consistent.MaxLogSize, FlushIntervalInMs: c.Consistent.FlushIntervalInMs, Storage: c.Consistent.Storage, UseFileBackend: c.Consistent.UseFileBackend, +======= + Level: c.Consistent.Level, + MaxLogSize: c.Consistent.MaxLogSize, + FlushIntervalInMs: c.Consistent.FlushIntervalInMs, + MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs, + EncodingWorkerNum: c.Consistent.EncodingWorkerNum, + FlushWorkerNum: c.Consistent.FlushWorkerNum, + Storage: c.Consistent.Storage, + UseFileBackend: c.Consistent.UseFileBackend, + Compression: c.Consistent.Compression, + FlushConcurrency: c.Consistent.FlushConcurrency, +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } } if c.Sink != nil { @@ -627,11 +640,24 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ +<<<<<<< HEAD Level: cloned.Consistent.Level, MaxLogSize: cloned.Consistent.MaxLogSize, FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, Storage: cloned.Consistent.Storage, UseFileBackend: cloned.Consistent.UseFileBackend, +======= + Level: cloned.Consistent.Level, + MaxLogSize: cloned.Consistent.MaxLogSize, + FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, + MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs, + EncodingWorkerNum: c.Consistent.EncodingWorkerNum, + FlushWorkerNum: c.Consistent.FlushWorkerNum, + Storage: cloned.Consistent.Storage, + UseFileBackend: cloned.Consistent.UseFileBackend, + Compression: cloned.Consistent.Compression, + FlushConcurrency: cloned.Consistent.FlushConcurrency, +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } } if cloned.Mounter != nil { @@ -807,11 +833,24 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { +<<<<<<< HEAD Level string `json:"level"` MaxLogSize int64 `json:"max_log_size"` FlushIntervalInMs int64 `json:"flush_interval"` Storage string `json:"storage"` UseFileBackend bool `json:"use_file_backend"` +======= + Level string `json:"level,omitempty"` + MaxLogSize int64 `json:"max_log_size"` + FlushIntervalInMs int64 `json:"flush_interval"` + MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` + EncodingWorkerNum int `json:"encoding_worker_num"` + FlushWorkerNum int `json:"flush_worker_num"` + Storage string `json:"storage,omitempty"` + UseFileBackend bool `json:"use_file_backend"` + Compression string `json:"compression,omitempty"` + FlushConcurrency int `json:"flush_concurrency,omitempty"` +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 45b366117c0..abef76c1c0e 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -170,7 +170,19 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { return errors.Trace(egCtx.Err()) case file := <-f.flushCh: start := time.Now() +<<<<<<< HEAD err := f.extStorage.WriteFile(egCtx, file.filename, file.data) +======= + if err := file.writer.Close(); err != nil { + return errors.Trace(err) + } + var err error + if f.cfg.FlushConcurrency <= 1 { + err = f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes()) + } else { + err = f.multiPartUpload(egCtx, file) + } +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) f.metricFlushAllDuration.Observe(time.Since(start).Seconds()) if err != nil { return errors.Trace(err) @@ -184,6 +196,19 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { } } +func (f *fileWorkerGroup) multiPartUpload(ctx context.Context, file *fileCache) error { + multipartWrite, err := f.extStorage.Create(ctx, file.filename, &storage.WriterOption{ + Concurrency: f.cfg.FlushConcurrency, + }) + if err != nil { + return errors.Trace(err) + } + if _, err = multipartWrite.Write(ctx, file.writer.buf.Bytes()); err != nil { + return errors.Trace(err) + } + return errors.Trace(multipartWrite.Close(ctx)) +} + func (f *fileWorkerGroup) bgWriteLogs( egCtx context.Context, inputCh <-chan *polymorphicRedoEvent, ) (err error) { diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 636edcf865f..722938c5e53 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -24,11 +24,24 @@ import ( // ConsistentConfig represents replication consistency config for a changefeed. type ConsistentConfig struct { +<<<<<<< HEAD Level string `toml:"level" json:"level"` MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` Storage string `toml:"storage" json:"storage"` UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` +======= + Level string `toml:"level" json:"level"` + MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` + FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` + MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"` + EncodingWorkerNum int `toml:"encoding-worker-num" json:"encoding-worker-num"` + FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"` + Storage string `toml:"storage" json:"storage"` + UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` + Compression string `toml:"compression" json:"compression"` + FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 3c4e69d6f05..fb920684398 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -195,9 +195,19 @@ func (s *extStorageWithTimeout) WalkDir( func (s *extStorageWithTimeout) Create( ctx context.Context, path string, ) (storage.ExternalFileWriter, error) { +<<<<<<< HEAD ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.ExternalStorage.Create(ctx, path) +======= + if option.Concurrency <= 1 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + // multipart uploading spawns a background goroutine, can't set timeout + return s.ExternalStorage.Create(ctx, path, option) +>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // Rename file name from oldFileName to newFileName diff --git a/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml index 7edf7be7d69..ff99477c9da 100644 --- a/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml +++ b/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml @@ -1,3 +1,4 @@ [consistent] level = "eventual" storage = "s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" +flush-concurrency = 2 From ccbad992b71f0455c64686a1ada073bf1b7a12e3 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Tue, 5 Dec 2023 15:07:47 +0800 Subject: [PATCH 2/2] Merge remote-tracking branch 'upstream/release-7.1' into cherry-pick-10171-to-release-7.1 # Conflicts: # cdc/api/v2/model.go # pkg/config/config_test_data.go # pkg/config/consistent.go # pkg/config/replica_config.go --- cdc/api/v2/model.go | 24 ------------------------ cdc/redo/writer/memory/file_worker.go | 4 ---- pkg/config/consistent.go | 8 -------- pkg/util/external_storage.go | 6 ------ 4 files changed, 42 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index f15a1ae1f77..2d5ed4e2efc 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -260,13 +260,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( } if c.Consistent != nil { res.Consistent = &config.ConsistentConfig{ -<<<<<<< HEAD - Level: c.Consistent.Level, - MaxLogSize: c.Consistent.MaxLogSize, - FlushIntervalInMs: c.Consistent.FlushIntervalInMs, - Storage: c.Consistent.Storage, - UseFileBackend: c.Consistent.UseFileBackend, -======= Level: c.Consistent.Level, MaxLogSize: c.Consistent.MaxLogSize, FlushIntervalInMs: c.Consistent.FlushIntervalInMs, @@ -277,7 +270,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( UseFileBackend: c.Consistent.UseFileBackend, Compression: c.Consistent.Compression, FlushConcurrency: c.Consistent.FlushConcurrency, ->>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } } if c.Sink != nil { @@ -640,13 +632,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ -<<<<<<< HEAD - Level: cloned.Consistent.Level, - MaxLogSize: cloned.Consistent.MaxLogSize, - FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, - Storage: cloned.Consistent.Storage, - UseFileBackend: cloned.Consistent.UseFileBackend, -======= Level: cloned.Consistent.Level, MaxLogSize: cloned.Consistent.MaxLogSize, FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, @@ -657,7 +642,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { UseFileBackend: cloned.Consistent.UseFileBackend, Compression: cloned.Consistent.Compression, FlushConcurrency: cloned.Consistent.FlushConcurrency, ->>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } } if cloned.Mounter != nil { @@ -833,13 +817,6 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { -<<<<<<< HEAD - Level string `json:"level"` - MaxLogSize int64 `json:"max_log_size"` - FlushIntervalInMs int64 `json:"flush_interval"` - Storage string `json:"storage"` - UseFileBackend bool `json:"use_file_backend"` -======= Level string `json:"level,omitempty"` MaxLogSize int64 `json:"max_log_size"` FlushIntervalInMs int64 `json:"flush_interval"` @@ -850,7 +827,6 @@ type ConsistentConfig struct { UseFileBackend bool `json:"use_file_backend"` Compression string `json:"compression,omitempty"` FlushConcurrency int `json:"flush_concurrency,omitempty"` ->>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index abef76c1c0e..adc7c04c084 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -170,9 +170,6 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { return errors.Trace(egCtx.Err()) case file := <-f.flushCh: start := time.Now() -<<<<<<< HEAD - err := f.extStorage.WriteFile(egCtx, file.filename, file.data) -======= if err := file.writer.Close(); err != nil { return errors.Trace(err) } @@ -182,7 +179,6 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { } else { err = f.multiPartUpload(egCtx, file) } ->>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) f.metricFlushAllDuration.Observe(time.Since(start).Seconds()) if err != nil { return errors.Trace(err) diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 722938c5e53..3a7a448928a 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -24,13 +24,6 @@ import ( // ConsistentConfig represents replication consistency config for a changefeed. type ConsistentConfig struct { -<<<<<<< HEAD - Level string `toml:"level" json:"level"` - MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` - FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` - Storage string `toml:"storage" json:"storage"` - UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` -======= Level string `toml:"level" json:"level"` MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` @@ -41,7 +34,6 @@ type ConsistentConfig struct { UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` Compression string `toml:"compression" json:"compression"` FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` ->>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index fb920684398..9dfce7ce49f 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -195,11 +195,6 @@ func (s *extStorageWithTimeout) WalkDir( func (s *extStorageWithTimeout) Create( ctx context.Context, path string, ) (storage.ExternalFileWriter, error) { -<<<<<<< HEAD - ctx, cancel := context.WithTimeout(ctx, s.timeout) - defer cancel() - return s.ExternalStorage.Create(ctx, path) -======= if option.Concurrency <= 1 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, s.timeout) @@ -207,7 +202,6 @@ func (s *extStorageWithTimeout) Create( } // multipart uploading spawns a background goroutine, can't set timeout return s.ExternalStorage.Create(ctx, path, option) ->>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227)) } // Rename file name from oldFileName to newFileName