Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9954
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Nov 29, 2023
1 parent 8c006b3 commit 54b1ffc
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 4 deletions.
30 changes: 30 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,20 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var cloudStorageConfig *config.CloudStorageConfig
if c.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &config.CloudStorageConfig{
<<<<<<< HEAD
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
=======
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
}
}

Expand Down Expand Up @@ -710,10 +720,20 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var cloudStorageConfig *CloudStorageConfig
if cloned.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &CloudStorageConfig{
<<<<<<< HEAD
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
=======
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
}
}

Expand Down Expand Up @@ -1192,10 +1212,20 @@ type MySQLConfig struct {

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
<<<<<<< HEAD
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
=======
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
8 changes: 5 additions & 3 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,14 @@ func TestGCAndCleanup(t *testing.T) {
cancel()
require.ErrorIs(t, eg.Wait(), context.Canceled)

m.Cleanup(ctx)
ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID))
cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
defer cleanupCancel()
m.Cleanup(cleanupCtx)
ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID))
require.NoError(t, err)
require.True(t, ret)
cnt := 0
extStorage.WalkDir(ctx, nil, func(path string, size int64) error {
extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error {
cnt++
return nil
})
Expand Down
37 changes: 36 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -237,12 +237,47 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
callbacks = append(callbacks, msg.Callback)
}

<<<<<<< HEAD
if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
}
return rowsCnt, nil
=======
if err := d.statistics.RecordBatchExecution(func() (int, int64, error) {
start := time.Now()
defer d.metricFlushDuration.Observe(time.Since(start).Seconds())

if d.config.FlushConcurrency <= 1 {
return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, 0, inErr
}
return rowsCnt, bytesCnt, nil
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
}); err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,14 @@ type CloudStorageConfig struct {
FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"`
FileSize *int `toml:"file-size" json:"file-size,omitempty"`

<<<<<<< HEAD
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
=======
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
Expand Down
20 changes: 20 additions & 0 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ const (
minFlushInterval = 2 * time.Second
// the upper limit of flush-interval.
maxFlushInterval = 10 * time.Minute
// defaultFlushConcurrency is the default value of flush-concurrency.
defaultFlushConcurrency = 1
// the lower limit of flush-concurrency.
minFlushConcurrency = 1
// the upper limit of flush-concurrency.
maxFlushConcurrency = 512
// defaultFileSize is the default value of file-size.
defaultFileSize = 64 * 1024 * 1024
// the lower limit of file size
Expand All @@ -65,6 +71,7 @@ type Config struct {
DateSeparator string
EnablePartitionSeparator bool
OutputColumnID bool
FlushConcurrency int
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -117,11 +124,24 @@ func (c *Config) Apply(
c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth)
if replicaConfig.Sink.CloudStorageConfig != nil {
c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID)
<<<<<<< HEAD
=======
if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil {
c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays
}
if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil {
c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec
}
c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency)
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
}

if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {
c.FileIndexWidth = config.DefaultFileIndexWidth
}
if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency {
c.FlushConcurrency = defaultFlushConcurrency
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) {
expected.FileIndexWidth = config.DefaultFileIndexWidth
expected.DateSeparator = config.DateSeparatorDay.String()
expected.EnablePartitionSeparator = true
expected.FlushConcurrency = 1
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
Expand Down

0 comments on commit 54b1ffc

Please sign in to comment.