Skip to content

Commit

Permalink
This is an automated cherry-pick of #10097
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Nov 26, 2023
1 parent 587b155 commit 87c7482
Show file tree
Hide file tree
Showing 17 changed files with 1,313 additions and 16 deletions.
309 changes: 309 additions & 0 deletions cdc/api/v2/model.go

Large diffs are not rendered by default.

182 changes: 181 additions & 1 deletion cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@ import (
"context"
"encoding/json"
"net/url"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
=======
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/util"
"github.com/robfig/cron"
"go.uber.org/zap"
)

Expand All @@ -42,12 +50,43 @@ type ddlSink struct {
// statistic is used to record the DDL metrics
statistics *metrics.Statistics
storage storage.ExternalStorage
cfg *cloudstorage.Config
cron *cron.Cron

<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
lastSendCheckpointTsTime time.Time
}

// NewCloudStorageDDLSink creates a ddl sink for cloud storage.
func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, error) {
=======
lastCheckpointTs atomic.Uint64
lastSendCheckpointTsTime time.Time
}

// NewDDLSink creates a ddl sink for cloud storage.
func NewDDLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
) (*DDLSink, error) {
return newDDLSink(ctx, changefeedID, sinkURI, replicaConfig, nil)
}

func newDDLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
cleanupJobs []func(), /* only for test */
) (*DDLSink, error) {
// create cloud storage config and then apply the params of sinkURI to it.
cfg := cloudstorage.NewConfig()
err := cfg.Apply(ctx, sinkURI, replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String())
if err != nil {
return nil, err
Expand All @@ -57,10 +96,25 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
d := &ddlSink{
id: changefeedID,
storage: storage,
<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
lastSendCheckpointTsTime: time.Now(),
}

=======
statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink),
cfg: cfg,
lastSendCheckpointTsTime: time.Now(),
}

if err := d.initCron(ctx, sinkURI, cleanupJobs); err != nil {
return nil, errors.Trace(err)
}
// Note: It is intended to run the cleanup goroutine in the background.
// we don't wait for it to finish since the gourotine would be stuck if
// the downstream is abnormal, especially when the downstream is a nfs.
go d.bgCleanup(ctx)
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
return d, nil
}

Expand Down Expand Up @@ -89,15 +143,23 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
}

var def cloudstorage.TableDefinition
<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
def.FromDDLEvent(ddl)
=======
def.FromDDLEvent(ddl, d.cfg.OutputColumnID)
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
if err := writeFile(def); err != nil {
return errors.Trace(err)
}

if ddl.Type == timodel.ActionExchangeTablePartition {
// For exchange partition, we need to write the schema of the source table.
var sourceTableDef cloudstorage.TableDefinition
<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version)
=======
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.cfg.OutputColumnID)
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
return writeFile(sourceTableDef)
}
return nil
Expand All @@ -115,6 +177,7 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context,

defer func() {
d.lastSendCheckpointTsTime = time.Now()
d.lastCheckpointTs.Store(ts)
}()
ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts})
if err != nil {
Expand All @@ -124,7 +187,124 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
return errors.Trace(err)
}

<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
func (d *ddlSink) Close() error {
=======
func (d *DDLSink) initCron(
ctx context.Context, sinkURI *url.URL, cleanupJobs []func(),
) (err error) {
if cleanupJobs == nil {
cleanupJobs = d.genCleanupJob(ctx, sinkURI)
}

d.cron = cron.New()
for _, job := range cleanupJobs {
err = d.cron.AddFunc(d.cfg.FileCleanupCronSpec, job)
if err != nil {
return err
}
}
return nil
}

func (d *DDLSink) bgCleanup(ctx context.Context) {
if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 {
log.Info("skip cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.String("dateSeparator", d.cfg.DateSeparator),
zap.Int("expiredFileTTL", d.cfg.FileExpirationDays))
return
}

d.cron.Start()
defer d.cron.Stop()
log.Info("start schedule cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.String("dateSeparator", d.cfg.DateSeparator),
zap.Int("expiredFileTTL", d.cfg.FileExpirationDays))

// wait for the context done
<-ctx.Done()
log.Info("stop schedule cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Error(ctx.Err()))
}

func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) []func() {
ret := []func(){}

isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == ""
isRemoveEmptyDirsRuning := atomic.Bool{}
if isLocal {
ret = append(ret, func() {
if !isRemoveEmptyDirsRuning.CompareAndSwap(false, true) {
log.Warn("remove empty dirs is already running, skip this round",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID))
return
}

checkpointTs := d.lastCheckpointTs.Load()
start := time.Now()
cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path)
if err != nil {
log.Error("failed to remove empty dirs",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
return
}
log.Info("remove empty dirs",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
})
}

isCleanupRunning := atomic.Bool{}
ret = append(ret, func() {
if !isCleanupRunning.CompareAndSwap(false, true) {
log.Warn("cleanup expired files is already running, skip this round",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID))
return
}

defer isCleanupRunning.Store(false)
start := time.Now()
checkpointTs := d.lastCheckpointTs.Load()
cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs)
if err != nil {
log.Error("failed to remove expired files",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
return
}
log.Info("remove expired files",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
})
return ret
}

// Close closes the sink.
func (d *DDLSink) Close() {
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
if d.statistics != nil {
d.statistics.Close()
}
Expand Down
52 changes: 50 additions & 2 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,34 @@ import (
"net/url"
"os"
"path"
"sync/atomic"
"testing"
"time"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

func TestWriteDDLEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s", parentDir)
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
sink, err := NewCloudStorageDDLSink(ctx, sinkURI)
=======
replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig)
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
require.Nil(t, err)

ddlEvent := &model.DDLEvent{
Expand Down Expand Up @@ -97,10 +107,17 @@ func TestWriteCheckpointTs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s", parentDir)
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
sink, err := NewCloudStorageDDLSink(ctx, sinkURI)
=======
replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig)
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
require.Nil(t, err)
tables := []*model.TableInfo{
{
Expand Down Expand Up @@ -132,3 +149,34 @@ func TestWriteCheckpointTs(t *testing.T) {
require.Nil(t, err)
require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata))
}

func TestCleanupExpiredFiles(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{
FileExpirationDays: util.AddressOf(1),
FileCleanupCronSpec: util.AddressOf("* * * * * *"),
}
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)

cnt := atomic.Int64{}
cleanupJobs := []func(){
func() {
cnt.Add(1)
},
}
sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs)
require.Nil(t, err)

_ = sink
time.Sleep(3 * time.Second)
require.LessOrEqual(t, int64(1), cnt.Load())
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
<<<<<<< HEAD
github.com/shirou/gopsutil/v3 v3.23.1
=======
github.com/robfig/cron v1.2.0
github.com/segmentio/kafka-go v0.4.41-0.20230526171612-f057b1d369cd
github.com/shirou/gopsutil/v3 v3.23.5
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097))
github.com/shopspring/decimal v1.3.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.6.1
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1108,8 +1108,17 @@ github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa/go.mod h1:qq
github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
<<<<<<< HEAD
github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8=
github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
=======
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097))
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
Loading

0 comments on commit 87c7482

Please sign in to comment.