Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10227
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
sdojjy authored and ti-chi-bot committed Dec 2, 2023
1 parent 7b984d6 commit 8f92152
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 0 deletions.
39 changes: 39 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,24 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
<<<<<<< HEAD
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
=======
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
EncodingWorkerNum: c.Consistent.EncodingWorkerNum,
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Compression: c.Consistent.Compression,
FlushConcurrency: c.Consistent.FlushConcurrency,
>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227))
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -627,11 +640,24 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
<<<<<<< HEAD
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
=======
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
EncodingWorkerNum: c.Consistent.EncodingWorkerNum,
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Compression: cloned.Consistent.Compression,
FlushConcurrency: cloned.Consistent.FlushConcurrency,
>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227))
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -807,11 +833,24 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
<<<<<<< HEAD
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
=======
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
EncodingWorkerNum int `json:"encoding_worker_num"`
FlushWorkerNum int `json:"flush_worker_num"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
Compression string `json:"compression,omitempty"`
FlushConcurrency int `json:"flush_concurrency,omitempty"`
>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227))
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
25 changes: 25 additions & 0 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,19 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error {
return errors.Trace(egCtx.Err())
case file := <-f.flushCh:
start := time.Now()
<<<<<<< HEAD
err := f.extStorage.WriteFile(egCtx, file.filename, file.data)
=======
if err := file.writer.Close(); err != nil {
return errors.Trace(err)
}
var err error
if f.cfg.FlushConcurrency <= 1 {
err = f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes())
} else {
err = f.multiPartUpload(egCtx, file)
}
>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227))
f.metricFlushAllDuration.Observe(time.Since(start).Seconds())
if err != nil {
return errors.Trace(err)
Expand All @@ -184,6 +196,19 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error {
}
}

func (f *fileWorkerGroup) multiPartUpload(ctx context.Context, file *fileCache) error {
multipartWrite, err := f.extStorage.Create(ctx, file.filename, &storage.WriterOption{
Concurrency: f.cfg.FlushConcurrency,
})
if err != nil {
return errors.Trace(err)
}
if _, err = multipartWrite.Write(ctx, file.writer.buf.Bytes()); err != nil {
return errors.Trace(err)
}
return errors.Trace(multipartWrite.Close(ctx))
}

func (f *fileWorkerGroup) bgWriteLogs(
egCtx context.Context, inputCh <-chan *polymorphicRedoEvent,
) (err error) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,24 @@ import (

// ConsistentConfig represents replication consistency config for a changefeed.
type ConsistentConfig struct {
<<<<<<< HEAD
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
=======
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
EncodingWorkerNum int `toml:"encoding-worker-num" json:"encoding-worker-num"`
FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
Compression string `toml:"compression" json:"compression"`
FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227))
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,19 @@ func (s *extStorageWithTimeout) WalkDir(
func (s *extStorageWithTimeout) Create(
ctx context.Context, path string,
) (storage.ExternalFileWriter, error) {
<<<<<<< HEAD
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
return s.ExternalStorage.Create(ctx, path)
=======
if option.Concurrency <= 1 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// multipart uploading spawns a background goroutine, can't set timeout
return s.ExternalStorage.Create(ctx, path, option)
>>>>>>> 89e57d7a6e (redo(ticdc): use multi part s3 uploader in redo (#10227))
}

// Rename file name from oldFileName to newFileName
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[consistent]
level = "eventual"
storage = "s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/"
flush-concurrency = 2

0 comments on commit 8f92152

Please sign in to comment.