From 600cdb84af77a8e46306b7d054c20ec07497be42 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 14 Nov 2023 19:02:50 +0800 Subject: [PATCH 01/10] cleanup expired files by day --- cdc/api/v2/model.go | 30 +++--- .../cloudstorage/cloud_storage_ddl_sink.go | 100 ++++++++++++++++-- .../cloud_storage_ddl_sink_test.go | 45 +++++++- go.mod | 1 + go.sum | 2 + pkg/config/sink.go | 20 +++- pkg/sink/cloudstorage/config.go | 22 +++- pkg/sink/cloudstorage/path.go | 36 +++++++ pkg/sink/cloudstorage/path_test.go | 86 +++++++++++++++ pkg/util/external_storage.go | 18 ++-- 10 files changed, 324 insertions(+), 36 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 9ff979894e6..99f0bd428a0 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -430,10 +430,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, } } @@ -711,10 +713,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, } } @@ -1195,10 +1199,12 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 0e0f6d169f8..c4e8d84e01b 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -17,9 +17,9 @@ import ( "context" "encoding/json" "net/url" + "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" timodel "github.com/pingcap/tidb/parser/model" @@ -27,9 +27,11 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/util" + "github.com/robfig/cron" "go.uber.org/zap" ) @@ -43,8 +45,9 @@ type DDLSink struct { // statistic is used to record the DDL metrics statistics *metrics.Statistics storage storage.ExternalStorage + cfg *cloudstorage.Config - outputColumnID bool + lastCheckpointTs atomic.Uint64 lastSendCheckpointTsTime time.Time } @@ -54,6 +57,22 @@ func NewDDLSink(ctx context.Context, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, ) (*DDLSink, error) { + return newDDLSink(ctx, changefeedID, sinkURI, replicaConfig, nil) +} + +func newDDLSink(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + cleanupJob func(), +) (*DDLSink, error) { + // create cloud storage config and then apply the params of sinkURI to it. + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, replicaConfig) + if err != nil { + return nil, err + } + storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String()) if err != nil { return nil, err @@ -63,13 +82,17 @@ func NewDDLSink(ctx context.Context, id: changefeedID, storage: storage, statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + cfg: cfg, lastSendCheckpointTsTime: time.Now(), } - if replicaConfig != nil && replicaConfig.Sink.CloudStorageConfig != nil { - d.outputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + // Note: It is intended to run the cleanup goroutine in the background. + // we don't wait for it to finish since the gourotine would be stuck if + // the downstream is abnormal, especially when the downstream is a nfs. + if cleanupJob == nil { + cleanupJob = d.runCleanup(ctx) } - + go d.bgCleanup(ctx, cleanupJob) return d, nil } @@ -98,7 +121,7 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error } var def cloudstorage.TableDefinition - def.FromDDLEvent(ddl, d.outputColumnID) + def.FromDDLEvent(ddl, d.cfg.OutputColumnID) if err := writeFile(def); err != nil { return errors.Trace(err) } @@ -106,7 +129,7 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error if ddl.Type == timodel.ActionExchangeTablePartition { // For exchange partition, we need to write the schema of the source table. var sourceTableDef cloudstorage.TableDefinition - sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.outputColumnID) + sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.cfg.OutputColumnID) return writeFile(sourceTableDef) } return nil @@ -125,6 +148,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, defer func() { d.lastSendCheckpointTsTime = time.Now() + d.lastCheckpointTs.Store(ts) }() ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts}) if err != nil { @@ -134,6 +158,68 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } +func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { + if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { + log.Info("skip cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("date-separator", d.cfg.DateSeparator), + zap.Int("expired-file-ttl", d.cfg.FileExpirationDays)) + return + } + + clenupCron := cron.New() + clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, cleanupJob) + clenupCron.Start() + defer clenupCron.Stop() + log.Info("start schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("date-separator", d.cfg.DateSeparator), + zap.Int("expired-file-ttl", d.cfg.FileExpirationDays)) + + // wait for the context done + <-ctx.Done() + log.Info("stop schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Error(ctx.Err())) +} + +func (d *DDLSink) runCleanup(ctx context.Context) func() { + isCleanupRunning := atomic.Bool{} + return func() { + if !isCleanupRunning.CompareAndSwap(false, true) { + log.Warn("cleanup expired files is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + defer isCleanupRunning.Store(false) + start := time.Now() + checkpointTs := d.lastCheckpointTs.Load() + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.storage, d.cfg, checkpointTs) + if err != nil { + log.Error("failed to remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + + log.Info("remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + } +} + // Close closes the sink. func (d *DDLSink) Close() { if d.statistics != nil { diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 0e62ec24c9a..b4cc70ae47b 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -26,6 +26,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -33,10 +35,13 @@ func TestWriteDDLEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) - sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, nil) + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) require.Nil(t, err) ddlEvent := &model.DDLEvent{ @@ -97,10 +102,13 @@ func TestWriteCheckpointTs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) - sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, nil) + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) require.Nil(t, err) tables := []*model.TableInfo{ { @@ -132,3 +140,32 @@ func TestWriteCheckpointTs(t *testing.T) { require.Nil(t, err) require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) } + +func TestCleanupExpiredFiles(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + + cnt := 0 + cleanupJob := func() { + cnt++ + } + sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJob) + require.Nil(t, err) + + _ = sink + time.Sleep(3 * time.Second) + require.LessOrEqual(t, 1, cnt) +} diff --git a/go.mod b/go.mod index adee3545f7b..7f358557ccf 100644 --- a/go.mod +++ b/go.mod @@ -147,6 +147,7 @@ require ( github.com/jfcg/sixb v1.3.8 // indirect github.com/jfcg/sorty/v2 v2.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/go.sum b/go.sum index cd211db13e1..fd72156f024 100644 --- a/go.sum +++ b/go.sum @@ -1122,6 +1122,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 8c7de70f564..51cfd423bff 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -272,6 +272,22 @@ func (d *DateSeparator) FromString(separator string) error { return nil } +// GetPattern returns the pattern of the date separator. +func (d DateSeparator) GetPattern() string { + switch d { + case DateSeparatorNone: + return "" + case DateSeparatorYear: + return `\d{4}` + case DateSeparatorMonth: + return `\d{4}-\d{2}` + case DateSeparatorDay: + return `\d{4}-\d{2}-\d{2}` + default: + return "" + } +} + func (d DateSeparator) String() string { switch d { case DateSeparatorNone: @@ -577,7 +593,9 @@ type CloudStorageConfig struct { FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` - OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 49d159330f5..9ed6ca04c4b 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -48,6 +48,12 @@ const ( minFileSize = 1024 * 1024 // the upper limit of file size maxFileSize = 512 * 1024 * 1024 + + // disable file cleanup by default + defaultFileExpirationDays = 0 + // Second | Minute | Hour | Dom | Month | DowOptional + // `0 0 2 * * ?` means 2:00:00 AM every day + defaultFileCleanupCronSpec = "0 0 2 * * *" ) type urlConfig struct { @@ -63,6 +69,8 @@ type Config struct { FileSize int FileIndexWidth int DateSeparator string + FileExpirationDays int + FileCleanupCronSpec string EnablePartitionSeparator bool OutputColumnID bool } @@ -70,9 +78,11 @@ type Config struct { // NewConfig returns the default cloud storage sink config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - FlushInterval: defaultFlushInterval, - FileSize: defaultFileSize, + WorkerCount: defaultWorkerCount, + FlushInterval: defaultFlushInterval, + FileSize: defaultFileSize, + FileExpirationDays: defaultFileExpirationDays, + FileCleanupCronSpec: defaultFileCleanupCronSpec, } } @@ -117,6 +127,12 @@ func (c *Config) Apply( c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) if replicaConfig.Sink.CloudStorageConfig != nil { c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil { + c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays + } + if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { + c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec + } } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 97694dc6beb..628dabb7d3e 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -21,6 +21,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" @@ -29,6 +30,8 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/util" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -403,3 +406,36 @@ func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, err return fileIdx, nil } + +var dateSeparatorDayRegexp *regexp.Regexp + +// RemoveExpiredFiles removes expired files from external storage. +func RemoveExpiredFiles( + ctx context.Context, + storage storage.ExternalStorage, + cfg *Config, + checkpointTs model.Ts, +) (uint64, error) { + if cfg.DateSeparator != config.DateSeparatorDay.String() { + return 0, nil + } + if dateSeparatorDayRegexp == nil { + dateSeparatorDayRegexp = regexp.MustCompile(config.DateSeparatorDay.GetPattern()) + } + + ttl := time.Duration(cfg.FileExpirationDays) * time.Hour * 24 + currTime := oracle.GetTimeFromTS(checkpointTs).Add(-ttl) + expiredDate := currTime.Format("2006-01-02") + + cnt := uint64(0) + err := util.RemoveFilesIf(ctx, storage, func(path string) bool { + // the path is like: /////CDC{num}.extension + match := dateSeparatorDayRegexp.FindString(path) + if match != "" && match < expiredDate { + cnt++ + return true + } + return false + }, nil) + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 6e97da55cb5..4dfa67d1060 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FilePathGenerator { @@ -328,3 +329,88 @@ func TestCheckOrWriteSchema(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(cnt)) } + +func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) + storage, err := util.GetExternalStorageFromURI(ctx, uri) + require.NoError(t, err) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.DateSeparator = util.AddressOf(config.DateSeparatorDay.String()) + replicaConfig.Sink.Protocol = util.AddressOf(config.ProtocolCsv.String()) + replicaConfig.Sink.FileIndexWidth = util.AddressOf(6) + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + cfg := NewConfig() + err = cfg.Apply(ctx, sinkURI, replicaConfig) + + // generate some expired files + filesWithoutPartition := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-01/CDC000001.csv", + "schema1/table1/5/2021-01-01/CDC000002.csv", + "schema1/table1/5/2021-01-01/CDC000003.csv", + "schema1/table1/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma1-table2 + "schema1/table2/5/2021-01-01/CDC000001.csv", + "schema1/table2/5/2021-01-01/CDC000002.csv", + "schema1/table2/5/2021-01-01/CDC000003.csv", + "schema1/table2/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table2/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithoutPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesWithPartition := []string{ + // schma1-table1 + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000001.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000002.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000003.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma2-table1 + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000001.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000002.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000003.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema2/table1/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesNotExpired := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-02/CDC000001.csv", + "schema1/table1/5/2021-01-02/CDC000002.csv", + "schema1/table1/5/2021-01-02/CDC000003.csv", + "schema1/table1/5/2021-01-02/" + defaultIndexFileName, // index + // schma1-table2 + "schema1/table2/5/2021-01-02/CDC000001.csv", + "schema1/table2/5/2021-01-02/CDC000002.csv", + "schema1/table2/5/2021-01-02/CDC000003.csv", + "schema1/table2/5/2021-01-02/" + defaultIndexFileName, // index + } + for _, file := range filesNotExpired { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) + checkpointTs := oracle.GoTimeToTS(currTime) + cnt, err := RemoveExpiredFiles(ctx, storage, cfg, checkpointTs) + require.NoError(t, err) + require.Equal(t, uint64(16), cnt) +} diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 19be69c0126..6ff01993967 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -260,38 +260,38 @@ func RemoveFilesIf( } log.Debug("Removing files", zap.Any("toRemoveFiles", toRemoveFiles)) - - for _, path := range toRemoveFiles { - if err := extStorage.DeleteFile(ctx, path); err != nil { - return errors.ErrExternalStorageAPI.Wrap(err) - } - } return DeleteFilesInExtStorage(ctx, extStorage, toRemoveFiles) } // DeleteFilesInExtStorage deletes files in external storage concurrently. +// TODO: Add a test for this function to cover batch delete. func DeleteFilesInExtStorage( ctx context.Context, extStorage storage.ExternalStorage, toRemoveFiles []string, ) error { limit := make(chan struct{}, 32) + batch := 3000 eg, egCtx := errgroup.WithContext(ctx) - for _, file := range toRemoveFiles { + for len(toRemoveFiles) > 0 { select { case <-egCtx.Done(): return egCtx.Err() case limit <- struct{}{}: } - name := file + if len(toRemoveFiles) < batch { + batch = len(toRemoveFiles) + } + files := toRemoveFiles[:batch] eg.Go(func() error { defer func() { <-limit }() - err := extStorage.DeleteFile(egCtx, name) + err := extStorage.DeleteFiles(egCtx, files) if err != nil && !IsNotExistInExtStorage(err) { // if fail then retry, may end up with notExit err, ignore the error return errors.ErrExternalStorageAPI.Wrap(err) } return nil }) + toRemoveFiles = toRemoveFiles[batch:] } return eg.Wait() } From 990c6ef7fe519f60a1caac124ae456e57a062e01 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 21 Nov 2023 14:00:20 +0800 Subject: [PATCH 02/10] add integration test --- .../storage_cleanup/conf/changefeed.toml | 21 +++++ .../storage_cleanup/conf/diff_config.toml | 29 +++++++ .../storage_cleanup/data/data.sql | 84 +++++++++++++++++++ .../storage_cleanup/data/schema.sql | 64 ++++++++++++++ .../integration_tests/storage_cleanup/run.sh | 34 ++++++++ 5 files changed, 232 insertions(+) create mode 100644 tests/integration_tests/storage_cleanup/conf/changefeed.toml create mode 100644 tests/integration_tests/storage_cleanup/conf/diff_config.toml create mode 100644 tests/integration_tests/storage_cleanup/data/data.sql create mode 100644 tests/integration_tests/storage_cleanup/data/schema.sql create mode 100644 tests/integration_tests/storage_cleanup/run.sh diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed.toml b/tests/integration_tests/storage_cleanup/conf/changefeed.toml new file mode 100644 index 00000000000..bf998c6d404 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed.toml @@ -0,0 +1,21 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true + +[sink.cloud-storage-config] +file-expiration-days = 1 +# Second | Minute | Hour | Dom | Month | DowOptional +file-cleanup-cron-spec = "* * * * * *" \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/conf/diff_config.toml b/tests/integration_tests/storage_cleanup/conf/diff_config.toml new file mode 100644 index 00000000000..4ff72cef8f9 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/storage_cleanup/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/storage_cleanup/data/data.sql b/tests/integration_tests/storage_cleanup/data/data.sql new file mode 100644 index 00000000000..cec3db763d5 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/data.sql @@ -0,0 +1,84 @@ +use `test`; +-- make sure `nullable` can be handled properly. +INSERT INTO multi_data_type() VALUES (); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -1, 1, -129, 129, -65536, 65536, -16777216, 16777216, -2147483649, 2147483649 + , true, 123.456, 123.123, 123456789012.123456789012 + , '测', '测试', x'89504E470D0A1A0A', x'89504E470D0A1A0A', '测试tinytext', '测试text', '测试mediumtext', '测试longtext' + , 'tinyblob', 'blob', 'mediumblob', 'longblob' + , '1977-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59', 2022 + , 'enum2', 1 + , 'a,b', NULL); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -2, 2, -130, 130, -65537, 65537, -16777217, 16777217, -2147483650, 2147483650 + , false, 123.4567, 123.1237, 123456789012.1234567890127 + , '2', '测试2', x'89504E470D0A1A0B', x'89504E470D0A1A0B', '测试2tinytext', '测试2text', '测试2mediumtext', '测试longtext' + , 'tinyblob2', 'blob2', 'mediumblob2', 'longblob2' + , '2021-01-01', '2021-12-31 23:59:59', '19731230153000', '22:59:59', 2021 + , 'enum1', 2 + , 'a,b,c', '{ + "id": 1, + "name": "hello" + }'); + +UPDATE multi_data_type +SET t_boolean = false +WHERE id = 1; + +DELETE +FROM multi_data_type +WHERE id = 3; + +INSERT INTO multi_charset +VALUES (1, '测试', "中国", "上海", "你好,世界" + , 0xC4E3BAC3CAC0BDE7); + +INSERT INTO multi_charset +VALUES (2, '部署', "美国", "纽约", "世界,你好" + , 0xCAC0BDE7C4E3BAC3); + +UPDATE multi_charset +SET name = '开发' +WHERE name = '测试'; + +DELETE FROM multi_charset +WHERE name = '部署' + AND country = '美国' + AND city = '纽约' + AND description = '世界,你好'; + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); diff --git a/tests/integration_tests/storage_cleanup/data/schema.sql b/tests/integration_tests/storage_cleanup/data/schema.sql new file mode 100644 index 00000000000..d6934693e4e --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/schema.sql @@ -0,0 +1,64 @@ +USE `test`; + +CREATE TABLE multi_data_type +( + id INT AUTO_INCREMENT, + t_tinyint TINYINT, + t_tinyint_unsigned TINYINT UNSIGNED, + t_smallint SMALLINT, + t_smallint_unsigned SMALLINT UNSIGNED, + t_mediumint MEDIUMINT, + t_mediumint_unsigned MEDIUMINT UNSIGNED, + t_int INT, + t_int_unsigned INT UNSIGNED, + t_bigint BIGINT, + t_bigint_unsigned BIGINT UNSIGNED, + t_boolean BOOLEAN, + t_float FLOAT(6, 2), + t_double DOUBLE(6, 2), + t_decimal DECIMAL(38, 19), + t_char CHAR, + t_varchar VARCHAR(10), + c_binary binary(16), + c_varbinary varbinary(16), + t_tinytext TINYTEXT, + t_text TEXT, + t_mediumtext MEDIUMTEXT, + t_longtext LONGTEXT, + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, + t_year YEAR, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), + t_bit BIT(64), + t_json JSON, + PRIMARY KEY (id) +); + +CREATE TABLE multi_charset ( + id INT, + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob, + PRIMARY KEY (id) +) ENGINE = InnoDB CHARSET = utf8mb4; + +CREATE TABLE binary_columns +( + id INT AUTO_INCREMENT, + c_binary binary(255), + c_varbinary varbinary(255), + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + PRIMARY KEY (id) +); \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh new file mode 100644 index 00000000000..90e7f5849c1 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + + run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From e74e9d45bdc5da29f2b68154a30c7906adb25751 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 21 Nov 2023 16:54:47 +0800 Subject: [PATCH 03/10] remove empty dirs --- .../cloudstorage/cloud_storage_ddl_sink.go | 51 +++++++++++++++---- .../cloud_storage_ddl_sink_test.go | 8 +-- pkg/sink/cloudstorage/path.go | 36 +++++++++++++ pkg/sink/cloudstorage/path_test.go | 2 +- .../integration_tests/storage_cleanup/run.sh | 46 +++++++++++++++++ 5 files changed, 129 insertions(+), 14 deletions(-) diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index c4e8d84e01b..42d67f05470 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -64,7 +64,7 @@ func newDDLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, - cleanupJob func(), + cleanupJobs []func(), ) (*DDLSink, error) { // create cloud storage config and then apply the params of sinkURI to it. cfg := cloudstorage.NewConfig() @@ -89,10 +89,13 @@ func newDDLSink(ctx context.Context, // Note: It is intended to run the cleanup goroutine in the background. // we don't wait for it to finish since the gourotine would be stuck if // the downstream is abnormal, especially when the downstream is a nfs. - if cleanupJob == nil { - cleanupJob = d.runCleanup(ctx) + if cleanupJobs == nil { + cleanupJobs, err = d.runCleanup(ctx, sinkURI) + if err != nil { + return nil, err + } } - go d.bgCleanup(ctx, cleanupJob) + go d.bgCleanup(ctx, cleanupJobs) return d, nil } @@ -158,7 +161,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } -func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { +func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { log.Info("skip cleanup expired files for storage sink", zap.String("namespace", d.id.Namespace), @@ -169,7 +172,9 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { } clenupCron := cron.New() - clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, cleanupJob) + for _, job := range cleanupJobs { + clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, job) + } clenupCron.Start() defer clenupCron.Stop() log.Info("start schedule cleanup expired files for storage sink", @@ -186,9 +191,34 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { zap.Error(ctx.Err())) } -func (d *DDLSink) runCleanup(ctx context.Context) func() { +func (d *DDLSink) runCleanup(ctx context.Context, uri *url.URL) ([]func(), error) { + ret := []func(){} + if uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" { + ret = append(ret, func() { + checkpointTs := d.lastCheckpointTs.Load() + start := time.Now() + cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path) + if err != nil { + log.Error("failed to remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + } + isCleanupRunning := atomic.Bool{} - return func() { + ret = append(ret, func() { if !isCleanupRunning.CompareAndSwap(false, true) { log.Warn("cleanup expired files is already running, skip this round", zap.String("namespace", d.id.Namespace), @@ -199,7 +229,7 @@ func (d *DDLSink) runCleanup(ctx context.Context) func() { defer isCleanupRunning.Store(false) start := time.Now() checkpointTs := d.lastCheckpointTs.Load() - cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.storage, d.cfg, checkpointTs) + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs) if err != nil { log.Error("failed to remove expired files", zap.String("namespace", d.id.Namespace), @@ -217,7 +247,8 @@ func (d *DDLSink) runCleanup(ctx context.Context) func() { zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("count", cnt), zap.Duration("cost", time.Since(start))) - } + }) + return ret, nil } // Close closes the sink. diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index b4cc70ae47b..f0fbe8475d0 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -159,10 +159,12 @@ func TestCleanupExpiredFiles(t *testing.T) { require.Nil(t, err) cnt := 0 - cleanupJob := func() { - cnt++ + cleanupJobs := []func(){ + func() { + cnt++ + }, } - sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJob) + sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs) require.Nil(t, err) _ = sink diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 628dabb7d3e..d792845d0cd 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -17,7 +17,10 @@ import ( "context" "fmt" "io" + "io/fs" + "os" "path" + "path/filepath" "regexp" "strconv" "strings" @@ -412,6 +415,7 @@ var dateSeparatorDayRegexp *regexp.Regexp // RemoveExpiredFiles removes expired files from external storage. func RemoveExpiredFiles( ctx context.Context, + _ model.ChangeFeedID, storage storage.ExternalStorage, cfg *Config, checkpointTs model.Ts, @@ -439,3 +443,35 @@ func RemoveExpiredFiles( }, nil) return cnt, err } + +func RemoveEmptyDirs( + ctx context.Context, + id model.ChangeFeedID, + target string, +) (uint64, error) { + cnt := uint64(0) + err := filepath.Walk(target, func(path string, info fs.FileInfo, err error) error { + if os.IsNotExist(err) || path == target || info == nil { + // if path not exists, we should return nil to continue. + return nil + } + if err != nil { + return err + } + if info.IsDir() { + files, err := os.ReadDir(path) + if err == nil && len(files) == 0 { + log.Debug("Deleting empty directory", + zap.String("namespace", id.Namespace), + zap.String("changeFeedID", id.ID), + zap.String("path", path)) + os.Remove(path) + cnt++ + return filepath.SkipDir + } + } + return nil + }) + + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 4dfa67d1060..8162dbb61b5 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -410,7 +410,7 @@ func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) checkpointTs := oracle.GoTimeToTS(currTime) - cnt, err := RemoveExpiredFiles(ctx, storage, cfg, checkpointTs) + cnt, err := RemoveExpiredFiles(ctx, model.ChangeFeedID{}, storage, cfg, checkpointTs) require.NoError(t, err) require.Equal(t, uint64(16), cnt) } diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh index 90e7f5849c1..833fbc05308 100644 --- a/tests/integration_tests/storage_cleanup/run.sh +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -8,6 +8,47 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function generate_single_table_files() { + local workdir=$1 + local bucket=$2 + local schema=$3 + local table=$4 + local day=$5 + local file_cnt=$6 + + table_dir=$workdir/$bucket/$schema/$table/$day + mkdir -p $table_dir + for i in $(seq 1 $file_cnt); do + touch $table_dir/$i.data + done + + mkdir -p $table_dir/meta + touch $table_dir/meta/CDC.index +} + +function generate_historic_files() { + local target_bucket="storage_test" + # historic files of table in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type 2022-01-01 10 + generate_single_table_files $WORK_DIR $target_bucket test multi_charset 2022-01-02 10 + generate_single_table_files $WORK_DIR $target_bucket test binary_columns 2022-01-03 10 + + # historic files of tables in test but not in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy 2022-01-01 10 + generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy 2022-01-02 10 + generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy 2022-01-03 10 + + # historic files of table belongs to different schema + generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type 2022-01-01 10 + generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset 2022-01-02 10 + generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns 2022-01-03 10 + + # historic files in different bucket, which should not be cleaned + generate_single_table_files $WORK_DIR storage_test2 test multi_data_type 2022-01-01 10 + generate_single_table_files $WORK_DIR storage_test2 test multi_charset 2022-01-02 10 + generate_single_table_files $WORK_DIR storage_test2 test binary_columns 2022-01-03 10 +} + function run() { if [ "$SINK_TYPE" != "storage" ]; then return @@ -18,6 +59,11 @@ function run() { cd $WORK_DIR run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + generate_historic_files + + # wait input + read -p "Press enter to continue" + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml From b86a6f8209a1da39c8d8397dc008184cb7e609d6 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 21 Nov 2023 18:45:52 +0800 Subject: [PATCH 04/10] add integrtion test --- .../storage_cleanup/conf/changefeed.toml | 1 + .../integration_tests/storage_cleanup/run.sh | 76 +++++++++++++++---- 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed.toml b/tests/integration_tests/storage_cleanup/conf/changefeed.toml index bf998c6d404..60663c5d577 100644 --- a/tests/integration_tests/storage_cleanup/conf/changefeed.toml +++ b/tests/integration_tests/storage_cleanup/conf/changefeed.toml @@ -18,4 +18,5 @@ include-commit-ts = true [sink.cloud-storage-config] file-expiration-days = 1 # Second | Minute | Hour | Dom | Month | DowOptional +# cleanup every second file-cleanup-cron-spec = "* * * * * *" \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh index 833fbc05308..31dcc8a5fdc 100644 --- a/tests/integration_tests/storage_cleanup/run.sh +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -1,6 +1,6 @@ #!/bin/bash -set -eu +set -eux CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $CUR/../_utils/test_prepare @@ -8,6 +8,9 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 + +EXIST_FILES=() +CLEANED_FILES=() function generate_single_table_files() { local workdir=$1 local bucket=$2 @@ -15,11 +18,18 @@ function generate_single_table_files() { local table=$4 local day=$5 local file_cnt=$6 + local should_clean=$7 # true or false table_dir=$workdir/$bucket/$schema/$table/$day mkdir -p $table_dir for i in $(seq 1 $file_cnt); do - touch $table_dir/$i.data + file=$table_dir/$i.data + touch $file + if [ "$should_clean" == "true" ]; then + CLEANED_FILES+=($file) + else + EXIST_FILES+=($file) + fi done mkdir -p $table_dir/meta @@ -28,25 +38,52 @@ function generate_single_table_files() { function generate_historic_files() { local target_bucket="storage_test" + yesterday=$(date -d "yesterday" +"%Y-%m-%d") # should not be cleaned since file-expiration-days is 1 + day_before_yesterday=$(date -d "2 days ago" +"%Y-%m-%d") # should be cleaned + # historic files of table in schema.sql - generate_single_table_files $WORK_DIR $target_bucket test multi_data_type 2022-01-01 10 - generate_single_table_files $WORK_DIR $target_bucket test multi_charset 2022-01-02 10 - generate_single_table_files $WORK_DIR $target_bucket test binary_columns 2022-01-03 10 + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type $yesterday 10 false + generate_single_table_files $WORK_DIR $target_bucket test multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns $day_before_yesterday 10 true # historic files of tables in test but not in schema.sql - generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy 2022-01-01 10 - generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy 2022-01-02 10 - generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy 2022-01-03 10 + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy $yesterday 10 false # historic files of table belongs to different schema - generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type 2022-01-01 10 - generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset 2022-01-02 10 - generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns 2022-01-03 10 - - # historic files in different bucket, which should not be cleaned - generate_single_table_files $WORK_DIR storage_test2 test multi_data_type 2022-01-01 10 - generate_single_table_files $WORK_DIR storage_test2 test multi_charset 2022-01-02 10 - generate_single_table_files $WORK_DIR storage_test2 test binary_columns 2022-01-03 10 + generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns $yesterday 10 false + + # historic files in default bucket, which should not be cleaned + generate_single_table_files $WORK_DIR storage_test_default test multi_data_type 2022-01-01 10 false + generate_single_table_files $WORK_DIR storage_test_default test multi_charset 2022-01-02 10 false + generate_single_table_files $WORK_DIR storage_test_default test binary_columns 2022-01-03 10 false +} + +function check_file_exists() { + local all_should_exist=$1 + for f in ${EXIST_FILES[@]}; do + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + done + + for f in ${CLEANED_FILES[@]}; do + if [ "$all_should_exist" == "true" ]; then + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + else + if [ -f $f ]; then + echo "file $f should not exist but exists" + exit 1 + fi + fi + done } function run() { @@ -61,11 +98,18 @@ function run() { generate_historic_files + SINK_URI_DEFAULT="file://$WORK_DIR/storage_test_default?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI_DEFAULT" -c "default-config-test" --config=$CUR/conf/changefeed-default.toml + sleep 20 + check_file_exists true + # wait input read -p "Press enter to continue" SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + sleep 20 + check_file_exists false run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} From 6342dd9033021a44076e703f1ba1b8b67ab6190b Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 21 Nov 2023 18:46:00 +0800 Subject: [PATCH 05/10] add integrtion test --- tests/integration_tests/run_group.sh | 2 +- .../storage_cleanup/conf/changefeed-default.toml | 16 ++++++++++++++++ tests/integration_tests/storage_cleanup/run.sh | 7 +++---- 3 files changed, 20 insertions(+), 5 deletions(-) create mode 100644 tests/integration_tests/storage_cleanup/conf/changefeed-default.toml diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 8dabad44af3..0a394227ebb 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -19,7 +19,7 @@ kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic canal_jso kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" storage_only="lossy_ddl storage_csv_update" -storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" +storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table" # Define groups diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml new file mode 100644 index 00000000000..74f1f934a8a --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml @@ -0,0 +1,16 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh index 31dcc8a5fdc..e0729f25e4b 100644 --- a/tests/integration_tests/storage_cleanup/run.sh +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -8,7 +8,6 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 - EXIST_FILES=() CLEANED_FILES=() function generate_single_table_files() { @@ -23,7 +22,7 @@ function generate_single_table_files() { table_dir=$workdir/$bucket/$schema/$table/$day mkdir -p $table_dir for i in $(seq 1 $file_cnt); do - file=$table_dir/$i.data + file=$table_dir/$i.data touch $file if [ "$should_clean" == "true" ]; then CLEANED_FILES+=($file) @@ -38,8 +37,8 @@ function generate_single_table_files() { function generate_historic_files() { local target_bucket="storage_test" - yesterday=$(date -d "yesterday" +"%Y-%m-%d") # should not be cleaned since file-expiration-days is 1 - day_before_yesterday=$(date -d "2 days ago" +"%Y-%m-%d") # should be cleaned + yesterday=$(date -d "yesterday" +"%Y-%m-%d") # should not be cleaned since file-expiration-days is 1 + day_before_yesterday=$(date -d "2 days ago" +"%Y-%m-%d") # should be cleaned # historic files of table in schema.sql generate_single_table_files $WORK_DIR $target_bucket test multi_data_type $yesterday 10 false From ecd227742ea872d40e6f8c7feda1ff2e9d56886c Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 22 Nov 2023 16:44:19 +0800 Subject: [PATCH 06/10] fix ut --- cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go | 8 ++++---- tests/integration_tests/storage_cleanup/run.sh | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 42d67f05470..6ac7217e24b 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -166,8 +166,8 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { log.Info("skip cleanup expired files for storage sink", zap.String("namespace", d.id.Namespace), zap.String("changefeedID", d.id.ID), - zap.String("date-separator", d.cfg.DateSeparator), - zap.Int("expired-file-ttl", d.cfg.FileExpirationDays)) + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) return } @@ -180,8 +180,8 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { log.Info("start schedule cleanup expired files for storage sink", zap.String("namespace", d.id.Namespace), zap.String("changefeedID", d.id.ID), - zap.String("date-separator", d.cfg.DateSeparator), - zap.Int("expired-file-ttl", d.cfg.FileExpirationDays)) + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) // wait for the context done <-ctx.Done() diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh index e0729f25e4b..0e724226820 100644 --- a/tests/integration_tests/storage_cleanup/run.sh +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -102,9 +102,6 @@ function run() { sleep 20 check_file_exists true - # wait input - read -p "Press enter to continue" - SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml sleep 20 From c8982f0d3a7db24866d289483979e992e4082e82 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Thu, 23 Nov 2023 12:19:05 +0800 Subject: [PATCH 07/10] fix ut --- .../cloudstorage/cloud_storage_ddl_sink.go | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 6ac7217e24b..4cfbe9da721 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -46,6 +46,7 @@ type DDLSink struct { statistics *metrics.Statistics storage storage.ExternalStorage cfg *cloudstorage.Config + cron *cron.Cron lastCheckpointTs atomic.Uint64 lastSendCheckpointTsTime time.Time @@ -70,7 +71,7 @@ func newDDLSink(ctx context.Context, cfg := cloudstorage.NewConfig() err := cfg.Apply(ctx, sinkURI, replicaConfig) if err != nil { - return nil, err + return nil, errors.Trace(err) } storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String()) @@ -86,16 +87,13 @@ func newDDLSink(ctx context.Context, lastSendCheckpointTsTime: time.Now(), } + if err := d.initCron(ctx, sinkURI, cleanupJobs); err != nil { + return nil, errors.Trace(err) + } // Note: It is intended to run the cleanup goroutine in the background. // we don't wait for it to finish since the gourotine would be stuck if // the downstream is abnormal, especially when the downstream is a nfs. - if cleanupJobs == nil { - cleanupJobs, err = d.runCleanup(ctx, sinkURI) - if err != nil { - return nil, err - } - } - go d.bgCleanup(ctx, cleanupJobs) + go d.bgCleanup(ctx) return d, nil } @@ -161,7 +159,27 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } -func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { +func (d *DDLSink) initCron( + ctx context.Context, sinkURI *url.URL, cleanupJobs []func(), +) (err error) { + if cleanupJobs == nil { + cleanupJobs, err = d.genCleanupJob(ctx, sinkURI) + if err != nil { + return err + } + } + + d.cron = cron.New() + for _, job := range cleanupJobs { + err = d.cron.AddFunc(d.cfg.FileCleanupCronSpec, job) + if err != nil { + return err + } + } + return nil +} + +func (d *DDLSink) bgCleanup(ctx context.Context) { if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { log.Info("skip cleanup expired files for storage sink", zap.String("namespace", d.id.Namespace), @@ -171,12 +189,8 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { return } - clenupCron := cron.New() - for _, job := range cleanupJobs { - clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, job) - } - clenupCron.Start() - defer clenupCron.Stop() + d.cron.Start() + defer d.cron.Stop() log.Info("start schedule cleanup expired files for storage sink", zap.String("namespace", d.id.Namespace), zap.String("changefeedID", d.id.ID), @@ -191,7 +205,7 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { zap.Error(ctx.Err())) } -func (d *DDLSink) runCleanup(ctx context.Context, uri *url.URL) ([]func(), error) { +func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) ([]func(), error) { ret := []func(){} if uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" { ret = append(ret, func() { From cb10c785e4e5c376fb858b239a5b217ebbf61dab Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 24 Nov 2023 10:03:52 +0800 Subject: [PATCH 08/10] fix ut --- .../cloudstorage/cloud_storage_ddl_sink.go | 24 ++++++++++++------- pkg/sink/cloudstorage/path.go | 1 + pkg/sink/cloudstorage/path_test.go | 1 + 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 4cfbe9da721..66c116ae402 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -65,7 +65,7 @@ func newDDLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, - cleanupJobs []func(), + cleanupJobs []func(), /* only for test */ ) (*DDLSink, error) { // create cloud storage config and then apply the params of sinkURI to it. cfg := cloudstorage.NewConfig() @@ -163,10 +163,7 @@ func (d *DDLSink) initCron( ctx context.Context, sinkURI *url.URL, cleanupJobs []func(), ) (err error) { if cleanupJobs == nil { - cleanupJobs, err = d.genCleanupJob(ctx, sinkURI) - if err != nil { - return err - } + cleanupJobs = d.genCleanupJob(ctx, sinkURI) } d.cron = cron.New() @@ -205,10 +202,20 @@ func (d *DDLSink) bgCleanup(ctx context.Context) { zap.Error(ctx.Err())) } -func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) ([]func(), error) { +func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) []func() { ret := []func(){} - if uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" { + + isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" + isRemoveEmptyDirsRuning := atomic.Bool{} + if isLocal { ret = append(ret, func() { + if !isRemoveEmptyDirsRuning.CompareAndSwap(false, true) { + log.Warn("remove empty dirs is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + checkpointTs := d.lastCheckpointTs.Load() start := time.Now() cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path) @@ -254,7 +261,6 @@ func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) ([]func(), er ) return } - log.Info("remove expired files", zap.String("namespace", d.id.Namespace), zap.String("changefeedID", d.id.ID), @@ -262,7 +268,7 @@ func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) ([]func(), er zap.Uint64("count", cnt), zap.Duration("cost", time.Since(start))) }) - return ret, nil + return ret } // Close closes the sink. diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index d792845d0cd..f72d2fb1a55 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -444,6 +444,7 @@ func RemoveExpiredFiles( return cnt, err } +// RemoveEmptyDirs removes empty directories from external storage. func RemoveEmptyDirs( ctx context.Context, id model.ChangeFeedID, diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 8162dbb61b5..6cb74439fe5 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -351,6 +351,7 @@ func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { } cfg := NewConfig() err = cfg.Apply(ctx, sinkURI, replicaConfig) + require.NoError(t, err) // generate some expired files filesWithoutPartition := []string{ From cf82c329776e87ff6769918bca9f7bf638636bed Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 24 Nov 2023 10:26:49 +0800 Subject: [PATCH 09/10] fix check err --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7f358557ccf..5967048505b 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/prometheus/client_model v0.4.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 + github.com/robfig/cron v1.2.0 github.com/segmentio/kafka-go v0.4.41-0.20230526171612-f057b1d369cd github.com/shirou/gopsutil/v3 v3.23.5 github.com/shopspring/decimal v1.3.0 @@ -147,7 +148,6 @@ require ( github.com/jfcg/sixb v1.3.8 // indirect github.com/jfcg/sorty/v2 v2.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect - github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect From 642bcfbc8e9ebb443aad00b63c2f7d4db5957831 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 24 Nov 2023 10:57:30 +0800 Subject: [PATCH 10/10] fix data race --- .../ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index f0fbe8475d0..48c3d1cb714 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -19,6 +19,7 @@ import ( "net/url" "os" "path" + "sync/atomic" "testing" "time" @@ -158,10 +159,10 @@ func TestCleanupExpiredFiles(t *testing.T) { err = replicaConfig.ValidateAndAdjust(sinkURI) require.Nil(t, err) - cnt := 0 + cnt := atomic.Int64{} cleanupJobs := []func(){ func() { - cnt++ + cnt.Add(1) }, } sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs) @@ -169,5 +170,5 @@ func TestCleanupExpiredFiles(t *testing.T) { _ = sink time.Sleep(3 * time.Second) - require.LessOrEqual(t, 1, cnt) + require.LessOrEqual(t, int64(1), cnt.Load()) }