diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 05a7ba86a2c..33fded352ec 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -430,10 +430,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, } } @@ -710,10 +712,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, } } @@ -1192,10 +1196,12 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 0e0f6d169f8..c4e8d84e01b 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -17,9 +17,9 @@ import ( "context" "encoding/json" "net/url" + "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" timodel "github.com/pingcap/tidb/parser/model" @@ -27,9 +27,11 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/util" + "github.com/robfig/cron" "go.uber.org/zap" ) @@ -43,8 +45,9 @@ type DDLSink struct { // statistic is used to record the DDL metrics statistics *metrics.Statistics storage storage.ExternalStorage + cfg *cloudstorage.Config - outputColumnID bool + lastCheckpointTs atomic.Uint64 lastSendCheckpointTsTime time.Time } @@ -54,6 +57,22 @@ func NewDDLSink(ctx context.Context, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, ) (*DDLSink, error) { + return newDDLSink(ctx, changefeedID, sinkURI, replicaConfig, nil) +} + +func newDDLSink(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + cleanupJob func(), +) (*DDLSink, error) { + // create cloud storage config and then apply the params of sinkURI to it. + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, replicaConfig) + if err != nil { + return nil, err + } + storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String()) if err != nil { return nil, err @@ -63,13 +82,17 @@ func NewDDLSink(ctx context.Context, id: changefeedID, storage: storage, statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + cfg: cfg, lastSendCheckpointTsTime: time.Now(), } - if replicaConfig != nil && replicaConfig.Sink.CloudStorageConfig != nil { - d.outputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + // Note: It is intended to run the cleanup goroutine in the background. + // we don't wait for it to finish since the gourotine would be stuck if + // the downstream is abnormal, especially when the downstream is a nfs. + if cleanupJob == nil { + cleanupJob = d.runCleanup(ctx) } - + go d.bgCleanup(ctx, cleanupJob) return d, nil } @@ -98,7 +121,7 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error } var def cloudstorage.TableDefinition - def.FromDDLEvent(ddl, d.outputColumnID) + def.FromDDLEvent(ddl, d.cfg.OutputColumnID) if err := writeFile(def); err != nil { return errors.Trace(err) } @@ -106,7 +129,7 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error if ddl.Type == timodel.ActionExchangeTablePartition { // For exchange partition, we need to write the schema of the source table. var sourceTableDef cloudstorage.TableDefinition - sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.outputColumnID) + sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.cfg.OutputColumnID) return writeFile(sourceTableDef) } return nil @@ -125,6 +148,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, defer func() { d.lastSendCheckpointTsTime = time.Now() + d.lastCheckpointTs.Store(ts) }() ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts}) if err != nil { @@ -134,6 +158,68 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } +func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { + if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { + log.Info("skip cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("date-separator", d.cfg.DateSeparator), + zap.Int("expired-file-ttl", d.cfg.FileExpirationDays)) + return + } + + clenupCron := cron.New() + clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, cleanupJob) + clenupCron.Start() + defer clenupCron.Stop() + log.Info("start schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("date-separator", d.cfg.DateSeparator), + zap.Int("expired-file-ttl", d.cfg.FileExpirationDays)) + + // wait for the context done + <-ctx.Done() + log.Info("stop schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Error(ctx.Err())) +} + +func (d *DDLSink) runCleanup(ctx context.Context) func() { + isCleanupRunning := atomic.Bool{} + return func() { + if !isCleanupRunning.CompareAndSwap(false, true) { + log.Warn("cleanup expired files is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + defer isCleanupRunning.Store(false) + start := time.Now() + checkpointTs := d.lastCheckpointTs.Load() + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.storage, d.cfg, checkpointTs) + if err != nil { + log.Error("failed to remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + + log.Info("remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + } +} + // Close closes the sink. func (d *DDLSink) Close() { if d.statistics != nil { diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 0e62ec24c9a..b4cc70ae47b 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -26,6 +26,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -33,10 +35,13 @@ func TestWriteDDLEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) - sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, nil) + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) require.Nil(t, err) ddlEvent := &model.DDLEvent{ @@ -97,10 +102,13 @@ func TestWriteCheckpointTs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) - sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, nil) + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) require.Nil(t, err) tables := []*model.TableInfo{ { @@ -132,3 +140,32 @@ func TestWriteCheckpointTs(t *testing.T) { require.Nil(t, err) require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) } + +func TestCleanupExpiredFiles(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + + cnt := 0 + cleanupJob := func() { + cnt++ + } + sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJob) + require.Nil(t, err) + + _ = sink + time.Sleep(3 * time.Second) + require.LessOrEqual(t, 1, cnt) +} diff --git a/go.mod b/go.mod index adee3545f7b..7f358557ccf 100644 --- a/go.mod +++ b/go.mod @@ -147,6 +147,7 @@ require ( github.com/jfcg/sixb v1.3.8 // indirect github.com/jfcg/sorty/v2 v2.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/go.sum b/go.sum index cd211db13e1..fd72156f024 100644 --- a/go.sum +++ b/go.sum @@ -1122,6 +1122,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index da1174fe1e1..cc6bb9877b5 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -269,6 +269,22 @@ func (d *DateSeparator) FromString(separator string) error { return nil } +// GetPattern returns the pattern of the date separator. +func (d DateSeparator) GetPattern() string { + switch d { + case DateSeparatorNone: + return "" + case DateSeparatorYear: + return `\d{4}` + case DateSeparatorMonth: + return `\d{4}-\d{2}` + case DateSeparatorDay: + return `\d{4}-\d{2}-\d{2}` + default: + return "" + } +} + func (d DateSeparator) String() string { switch d { case DateSeparatorNone: @@ -574,7 +590,9 @@ type CloudStorageConfig struct { FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` - OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 49d159330f5..9ed6ca04c4b 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -48,6 +48,12 @@ const ( minFileSize = 1024 * 1024 // the upper limit of file size maxFileSize = 512 * 1024 * 1024 + + // disable file cleanup by default + defaultFileExpirationDays = 0 + // Second | Minute | Hour | Dom | Month | DowOptional + // `0 0 2 * * ?` means 2:00:00 AM every day + defaultFileCleanupCronSpec = "0 0 2 * * *" ) type urlConfig struct { @@ -63,6 +69,8 @@ type Config struct { FileSize int FileIndexWidth int DateSeparator string + FileExpirationDays int + FileCleanupCronSpec string EnablePartitionSeparator bool OutputColumnID bool } @@ -70,9 +78,11 @@ type Config struct { // NewConfig returns the default cloud storage sink config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - FlushInterval: defaultFlushInterval, - FileSize: defaultFileSize, + WorkerCount: defaultWorkerCount, + FlushInterval: defaultFlushInterval, + FileSize: defaultFileSize, + FileExpirationDays: defaultFileExpirationDays, + FileCleanupCronSpec: defaultFileCleanupCronSpec, } } @@ -117,6 +127,12 @@ func (c *Config) Apply( c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) if replicaConfig.Sink.CloudStorageConfig != nil { c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil { + c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays + } + if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { + c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec + } } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 97694dc6beb..628dabb7d3e 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -21,6 +21,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" @@ -29,6 +30,8 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/util" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -403,3 +406,36 @@ func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, err return fileIdx, nil } + +var dateSeparatorDayRegexp *regexp.Regexp + +// RemoveExpiredFiles removes expired files from external storage. +func RemoveExpiredFiles( + ctx context.Context, + storage storage.ExternalStorage, + cfg *Config, + checkpointTs model.Ts, +) (uint64, error) { + if cfg.DateSeparator != config.DateSeparatorDay.String() { + return 0, nil + } + if dateSeparatorDayRegexp == nil { + dateSeparatorDayRegexp = regexp.MustCompile(config.DateSeparatorDay.GetPattern()) + } + + ttl := time.Duration(cfg.FileExpirationDays) * time.Hour * 24 + currTime := oracle.GetTimeFromTS(checkpointTs).Add(-ttl) + expiredDate := currTime.Format("2006-01-02") + + cnt := uint64(0) + err := util.RemoveFilesIf(ctx, storage, func(path string) bool { + // the path is like: /////CDC{num}.extension + match := dateSeparatorDayRegexp.FindString(path) + if match != "" && match < expiredDate { + cnt++ + return true + } + return false + }, nil) + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 6e97da55cb5..4dfa67d1060 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FilePathGenerator { @@ -328,3 +329,88 @@ func TestCheckOrWriteSchema(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(cnt)) } + +func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) + storage, err := util.GetExternalStorageFromURI(ctx, uri) + require.NoError(t, err) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.DateSeparator = util.AddressOf(config.DateSeparatorDay.String()) + replicaConfig.Sink.Protocol = util.AddressOf(config.ProtocolCsv.String()) + replicaConfig.Sink.FileIndexWidth = util.AddressOf(6) + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + cfg := NewConfig() + err = cfg.Apply(ctx, sinkURI, replicaConfig) + + // generate some expired files + filesWithoutPartition := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-01/CDC000001.csv", + "schema1/table1/5/2021-01-01/CDC000002.csv", + "schema1/table1/5/2021-01-01/CDC000003.csv", + "schema1/table1/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma1-table2 + "schema1/table2/5/2021-01-01/CDC000001.csv", + "schema1/table2/5/2021-01-01/CDC000002.csv", + "schema1/table2/5/2021-01-01/CDC000003.csv", + "schema1/table2/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table2/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithoutPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesWithPartition := []string{ + // schma1-table1 + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000001.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000002.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000003.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma2-table1 + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000001.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000002.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000003.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema2/table1/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesNotExpired := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-02/CDC000001.csv", + "schema1/table1/5/2021-01-02/CDC000002.csv", + "schema1/table1/5/2021-01-02/CDC000003.csv", + "schema1/table1/5/2021-01-02/" + defaultIndexFileName, // index + // schma1-table2 + "schema1/table2/5/2021-01-02/CDC000001.csv", + "schema1/table2/5/2021-01-02/CDC000002.csv", + "schema1/table2/5/2021-01-02/CDC000003.csv", + "schema1/table2/5/2021-01-02/" + defaultIndexFileName, // index + } + for _, file := range filesNotExpired { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) + checkpointTs := oracle.GoTimeToTS(currTime) + cnt, err := RemoveExpiredFiles(ctx, storage, cfg, checkpointTs) + require.NoError(t, err) + require.Equal(t, uint64(16), cnt) +} diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 19be69c0126..6ff01993967 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -260,38 +260,38 @@ func RemoveFilesIf( } log.Debug("Removing files", zap.Any("toRemoveFiles", toRemoveFiles)) - - for _, path := range toRemoveFiles { - if err := extStorage.DeleteFile(ctx, path); err != nil { - return errors.ErrExternalStorageAPI.Wrap(err) - } - } return DeleteFilesInExtStorage(ctx, extStorage, toRemoveFiles) } // DeleteFilesInExtStorage deletes files in external storage concurrently. +// TODO: Add a test for this function to cover batch delete. func DeleteFilesInExtStorage( ctx context.Context, extStorage storage.ExternalStorage, toRemoveFiles []string, ) error { limit := make(chan struct{}, 32) + batch := 3000 eg, egCtx := errgroup.WithContext(ctx) - for _, file := range toRemoveFiles { + for len(toRemoveFiles) > 0 { select { case <-egCtx.Done(): return egCtx.Err() case limit <- struct{}{}: } - name := file + if len(toRemoveFiles) < batch { + batch = len(toRemoveFiles) + } + files := toRemoveFiles[:batch] eg.Go(func() error { defer func() { <-limit }() - err := extStorage.DeleteFile(egCtx, name) + err := extStorage.DeleteFiles(egCtx, files) if err != nil && !IsNotExistInExtStorage(err) { // if fail then retry, may end up with notExit err, ignore the error return errors.ErrExternalStorageAPI.Wrap(err) } return nil }) + toRemoveFiles = toRemoveFiles[batch:] } return eg.Wait() }