Skip to content

Commit

Permalink
remove empty dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 22, 2023
1 parent 990c6ef commit e74e9d4
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 14 deletions.
51 changes: 41 additions & 10 deletions cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newDDLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
cleanupJob func(),
cleanupJobs []func(),
) (*DDLSink, error) {
// create cloud storage config and then apply the params of sinkURI to it.
cfg := cloudstorage.NewConfig()
Expand All @@ -89,10 +89,13 @@ func newDDLSink(ctx context.Context,
// Note: It is intended to run the cleanup goroutine in the background.
// we don't wait for it to finish since the gourotine would be stuck if
// the downstream is abnormal, especially when the downstream is a nfs.
if cleanupJob == nil {
cleanupJob = d.runCleanup(ctx)
if cleanupJobs == nil {
cleanupJobs, err = d.runCleanup(ctx, sinkURI)
if err != nil {
return nil, err
}
}
go d.bgCleanup(ctx, cleanupJob)
go d.bgCleanup(ctx, cleanupJobs)
return d, nil
}

Expand Down Expand Up @@ -158,7 +161,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context,
return errors.Trace(err)
}

func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) {
func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJobs []func()) {
if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 {
log.Info("skip cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
Expand All @@ -169,7 +172,9 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) {
}

clenupCron := cron.New()
clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, cleanupJob)
for _, job := range cleanupJobs {
clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, job)
}
clenupCron.Start()
defer clenupCron.Stop()
log.Info("start schedule cleanup expired files for storage sink",
Expand All @@ -186,9 +191,34 @@ func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) {
zap.Error(ctx.Err()))
}

func (d *DDLSink) runCleanup(ctx context.Context) func() {
func (d *DDLSink) runCleanup(ctx context.Context, uri *url.URL) ([]func(), error) {
ret := []func(){}
if uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" {
ret = append(ret, func() {
checkpointTs := d.lastCheckpointTs.Load()
start := time.Now()
cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path)
if err != nil {
log.Error("failed to remove empty dirs",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
return
}
log.Info("remove empty dirs",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
})
}

isCleanupRunning := atomic.Bool{}
return func() {
ret = append(ret, func() {
if !isCleanupRunning.CompareAndSwap(false, true) {
log.Warn("cleanup expired files is already running, skip this round",
zap.String("namespace", d.id.Namespace),
Expand All @@ -199,7 +229,7 @@ func (d *DDLSink) runCleanup(ctx context.Context) func() {
defer isCleanupRunning.Store(false)
start := time.Now()
checkpointTs := d.lastCheckpointTs.Load()
cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.storage, d.cfg, checkpointTs)
cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs)
if err != nil {
log.Error("failed to remove expired files",
zap.String("namespace", d.id.Namespace),
Expand All @@ -217,7 +247,8 @@ func (d *DDLSink) runCleanup(ctx context.Context) func() {
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
}
})
return ret, nil
}

// Close closes the sink.
Expand Down
8 changes: 5 additions & 3 deletions cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ func TestCleanupExpiredFiles(t *testing.T) {
require.Nil(t, err)

cnt := 0
cleanupJob := func() {
cnt++
cleanupJobs := []func(){
func() {
cnt++
},
}
sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJob)
sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs)
require.Nil(t, err)

_ = sink
Expand Down
36 changes: 36 additions & 0 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -412,6 +415,7 @@ var dateSeparatorDayRegexp *regexp.Regexp
// RemoveExpiredFiles removes expired files from external storage.
func RemoveExpiredFiles(
ctx context.Context,
_ model.ChangeFeedID,
storage storage.ExternalStorage,
cfg *Config,
checkpointTs model.Ts,
Expand Down Expand Up @@ -439,3 +443,35 @@ func RemoveExpiredFiles(
}, nil)
return cnt, err
}

func RemoveEmptyDirs(
ctx context.Context,
id model.ChangeFeedID,
target string,
) (uint64, error) {
cnt := uint64(0)
err := filepath.Walk(target, func(path string, info fs.FileInfo, err error) error {
if os.IsNotExist(err) || path == target || info == nil {
// if path not exists, we should return nil to continue.
return nil
}
if err != nil {
return err
}
if info.IsDir() {
files, err := os.ReadDir(path)
if err == nil && len(files) == 0 {
log.Debug("Deleting empty directory",
zap.String("namespace", id.Namespace),
zap.String("changeFeedID", id.ID),
zap.String("path", path))
os.Remove(path)
cnt++
return filepath.SkipDir
}
}
return nil
})

return cnt, err
}
2 changes: 1 addition & 1 deletion pkg/sink/cloudstorage/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestRemoveExpiredFilesWithoutPartition(t *testing.T) {

currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC)
checkpointTs := oracle.GoTimeToTS(currTime)
cnt, err := RemoveExpiredFiles(ctx, storage, cfg, checkpointTs)
cnt, err := RemoveExpiredFiles(ctx, model.ChangeFeedID{}, storage, cfg, checkpointTs)
require.NoError(t, err)
require.Equal(t, uint64(16), cnt)
}
46 changes: 46 additions & 0 deletions tests/integration_tests/storage_cleanup/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,47 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function generate_single_table_files() {
local workdir=$1
local bucket=$2
local schema=$3
local table=$4
local day=$5
local file_cnt=$6

table_dir=$workdir/$bucket/$schema/$table/$day
mkdir -p $table_dir
for i in $(seq 1 $file_cnt); do
touch $table_dir/$i.data
done

mkdir -p $table_dir/meta
touch $table_dir/meta/CDC.index
}

function generate_historic_files() {
local target_bucket="storage_test"
# historic files of table in schema.sql
generate_single_table_files $WORK_DIR $target_bucket test multi_data_type 2022-01-01 10
generate_single_table_files $WORK_DIR $target_bucket test multi_charset 2022-01-02 10
generate_single_table_files $WORK_DIR $target_bucket test binary_columns 2022-01-03 10

# historic files of tables in test but not in schema.sql
generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy 2022-01-01 10
generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy 2022-01-02 10
generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy 2022-01-03 10

# historic files of table belongs to different schema
generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type 2022-01-01 10
generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset 2022-01-02 10
generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns 2022-01-03 10

# historic files in different bucket, which should not be cleaned
generate_single_table_files $WORK_DIR storage_test2 test multi_data_type 2022-01-01 10
generate_single_table_files $WORK_DIR storage_test2 test multi_charset 2022-01-02 10
generate_single_table_files $WORK_DIR storage_test2 test binary_columns 2022-01-03 10
}

function run() {
if [ "$SINK_TYPE" != "storage" ]; then
return
Expand All @@ -18,6 +59,11 @@ function run() {
cd $WORK_DIR
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

generate_historic_files

# wait input
read -p "Press enter to continue"

SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml

Expand Down

0 comments on commit e74e9d4

Please sign in to comment.