diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index c4e8d84e01b..42d67f05470 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -64,7 +64,7 @@ func newDDLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, - cleanupJob func(), + cleanupJobs []func(), ) (*DDLSink, error) { // create cloud storage config and then apply the params of sinkURI to it. cfg := cloudstorage.NewConfig() @@ -89,10 +89,13 @@ func newDDLSink(ctx context.Context, // Note: It is intended to run the cleanup goroutine in the background. // we don't wait for it to finish since the gourotine would be stuck if // the downstream is abnormal, especially when the downstream is a nfs. - if cleanupJob == nil { - cleanupJob = d.runCleanup(ctx) + if cleanupJobs == nil { + cleanupJobs, err = d.runCleanup(ctx, sinkURI) + if err != nil { + return nil, err + } } - go d.bgCleanup(ctx, cleanupJob) + go d.bgCleanup(ctx, cleanupJobs) return d, nil } @@ -158,7 +161,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } -func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { +func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) { if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { log.Info("skip cleanup expired files for storage sink", zap.String("namespace", d.id.Namespace), @@ -169,7 +172,9 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { } clenupCron := cron.New() - clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, cleanupJob) + for _, job := range cleanupJobs { + clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, job) + } clenupCron.Start() defer clenupCron.Stop() log.Info("start schedule cleanup expired files for storage sink", @@ -186,9 +191,34 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) { zap.Error(ctx.Err())) } -func (d *DDLSink) runCleanup(ctx context.Context) func() { +func (d *DDLSink) runCleanup(ctx context.Context, uri *url.URL) ([]func(), error) { + ret := []func(){} + if uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" { + ret = append(ret, func() { + checkpointTs := d.lastCheckpointTs.Load() + start := time.Now() + cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path) + if err != nil { + log.Error("failed to remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + } + isCleanupRunning := atomic.Bool{} - return func() { + ret = append(ret, func() { if !isCleanupRunning.CompareAndSwap(false, true) { log.Warn("cleanup expired files is already running, skip this round", zap.String("namespace", d.id.Namespace), @@ -199,7 +229,7 @@ func (d *DDLSink) runCleanup(ctx context.Context) func() { defer isCleanupRunning.Store(false) start := time.Now() checkpointTs := d.lastCheckpointTs.Load() - cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.storage, d.cfg, checkpointTs) + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs) if err != nil { log.Error("failed to remove expired files", zap.String("namespace", d.id.Namespace), @@ -217,7 +247,8 @@ func (d *DDLSink) runCleanup(ctx context.Context) func() { zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("count", cnt), zap.Duration("cost", time.Since(start))) - } + }) + return ret, nil } // Close closes the sink. diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index b4cc70ae47b..f0fbe8475d0 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -159,10 +159,12 @@ func TestCleanupExpiredFiles(t *testing.T) { require.Nil(t, err) cnt := 0 - cleanupJob := func() { - cnt++ + cleanupJobs := []func(){ + func() { + cnt++ + }, } - sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJob) + sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs) require.Nil(t, err) _ = sink diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 628dabb7d3e..d792845d0cd 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -17,7 +17,10 @@ import ( "context" "fmt" "io" + "io/fs" + "os" "path" + "path/filepath" "regexp" "strconv" "strings" @@ -412,6 +415,7 @@ var dateSeparatorDayRegexp *regexp.Regexp // RemoveExpiredFiles removes expired files from external storage. func RemoveExpiredFiles( ctx context.Context, + _ model.ChangeFeedID, storage storage.ExternalStorage, cfg *Config, checkpointTs model.Ts, @@ -439,3 +443,35 @@ func RemoveExpiredFiles( }, nil) return cnt, err } + +func RemoveEmptyDirs( + ctx context.Context, + id model.ChangeFeedID, + target string, +) (uint64, error) { + cnt := uint64(0) + err := filepath.Walk(target, func(path string, info fs.FileInfo, err error) error { + if os.IsNotExist(err) || path == target || info == nil { + // if path not exists, we should return nil to continue. + return nil + } + if err != nil { + return err + } + if info.IsDir() { + files, err := os.ReadDir(path) + if err == nil && len(files) == 0 { + log.Debug("Deleting empty directory", + zap.String("namespace", id.Namespace), + zap.String("changeFeedID", id.ID), + zap.String("path", path)) + os.Remove(path) + cnt++ + return filepath.SkipDir + } + } + return nil + }) + + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 4dfa67d1060..8162dbb61b5 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -410,7 +410,7 @@ func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) checkpointTs := oracle.GoTimeToTS(currTime) - cnt, err := RemoveExpiredFiles(ctx, storage, cfg, checkpointTs) + cnt, err := RemoveExpiredFiles(ctx, model.ChangeFeedID{}, storage, cfg, checkpointTs) require.NoError(t, err) require.Equal(t, uint64(16), cnt) } diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh index 90e7f5849c1..833fbc05308 100644 --- a/tests/integration_tests/storage_cleanup/run.sh +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -8,6 +8,47 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function generate_single_table_files() { + local workdir=$1 + local bucket=$2 + local schema=$3 + local table=$4 + local day=$5 + local file_cnt=$6 + + table_dir=$workdir/$bucket/$schema/$table/$day + mkdir -p $table_dir + for i in $(seq 1 $file_cnt); do + touch $table_dir/$i.data + done + + mkdir -p $table_dir/meta + touch $table_dir/meta/CDC.index +} + +function generate_historic_files() { + local target_bucket="storage_test" + # historic files of table in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type 2022-01-01 10 + generate_single_table_files $WORK_DIR $target_bucket test multi_charset 2022-01-02 10 + generate_single_table_files $WORK_DIR $target_bucket test binary_columns 2022-01-03 10 + + # historic files of tables in test but not in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy 2022-01-01 10 + generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy 2022-01-02 10 + generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy 2022-01-03 10 + + # historic files of table belongs to different schema + generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type 2022-01-01 10 + generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset 2022-01-02 10 + generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns 2022-01-03 10 + + # historic files in different bucket, which should not be cleaned + generate_single_table_files $WORK_DIR storage_test2 test multi_data_type 2022-01-01 10 + generate_single_table_files $WORK_DIR storage_test2 test multi_charset 2022-01-02 10 + generate_single_table_files $WORK_DIR storage_test2 test binary_columns 2022-01-03 10 +} + function run() { if [ "$SINK_TYPE" != "storage" ]; then return @@ -18,6 +59,11 @@ function run() { cd $WORK_DIR run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + generate_historic_files + + # wait input + read -p "Press enter to continue" + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml