From 946adc2940efae4e8eb5ded98251b49a46834c28 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Sun, 26 Nov 2023 17:25:13 +0800 Subject: [PATCH] This is an automated cherry-pick of #10097 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 27 +++ .../cloudstorage/cloud_storage_ddl_sink.go | 174 +++++++++++++++++- .../cloud_storage_ddl_sink_test.go | 52 +++++- go.mod | 6 + go.sum | 7 + pkg/config/sink.go | 23 +++ pkg/sink/cloudstorage/config.go | 31 +++- pkg/sink/cloudstorage/path.go | 73 ++++++++ pkg/sink/cloudstorage/path_test.go | 138 ++++++++++++++ pkg/util/external_storage.go | 18 +- tests/integration_tests/run_group.sh | 6 + .../conf/changefeed-default.toml | 16 ++ .../storage_cleanup/conf/changefeed.toml | 22 +++ .../storage_cleanup/conf/diff_config.toml | 29 +++ .../storage_cleanup/data/data.sql | 84 +++++++++ .../storage_cleanup/data/schema.sql | 64 +++++++ .../integration_tests/storage_cleanup/run.sh | 120 ++++++++++++ 17 files changed, 875 insertions(+), 15 deletions(-) create mode 100644 tests/integration_tests/storage_cleanup/conf/changefeed-default.toml create mode 100644 tests/integration_tests/storage_cleanup/conf/changefeed.toml create mode 100644 tests/integration_tests/storage_cleanup/conf/diff_config.toml create mode 100644 tests/integration_tests/storage_cleanup/data/data.sql create mode 100644 tests/integration_tests/storage_cleanup/data/schema.sql create mode 100644 tests/integration_tests/storage_cleanup/run.sh diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3206f2f1899..c322edba657 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -377,9 +377,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ +<<<<<<< HEAD WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, FileSize: c.Sink.CloudStorageConfig.FileSize, +======= + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) } } @@ -596,9 +605,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ +<<<<<<< HEAD WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, FileSize: cloned.Sink.CloudStorageConfig.FileSize, +======= + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) } } @@ -1023,9 +1041,18 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { +<<<<<<< HEAD WorkerCount *int `json:"worker_count,omitempty"` FlushInterval *string `json:"flush_interval,omitempty"` FileSize *int `json:"file_size,omitempty"` +======= + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 6fee98a900d..ad42548843c 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -17,9 +17,9 @@ import ( "context" "encoding/json" "net/url" + "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" timodel "github.com/pingcap/tidb/parser/model" @@ -27,9 +27,15 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" +<<<<<<< HEAD +======= + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/util" + "github.com/robfig/cron" "go.uber.org/zap" ) @@ -43,12 +49,42 @@ type DDLSink struct { // statistic is used to record the DDL metrics statistics *metrics.Statistics storage storage.ExternalStorage + cfg *cloudstorage.Config + cron *cron.Cron +<<<<<<< HEAD +======= + lastCheckpointTs atomic.Uint64 +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) lastSendCheckpointTsTime time.Time } // NewDDLSink creates a ddl sink for cloud storage. +<<<<<<< HEAD func NewDDLSink(ctx context.Context, sinkURI *url.URL) (*DDLSink, error) { +======= +func NewDDLSink(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, +) (*DDLSink, error) { + return newDDLSink(ctx, changefeedID, sinkURI, replicaConfig, nil) +} + +func newDDLSink(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + cleanupJobs []func(), /* only for test */ +) (*DDLSink, error) { + // create cloud storage config and then apply the params of sinkURI to it. + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, replicaConfig) + if err != nil { + return nil, errors.Trace(err) + } + +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String()) if err != nil { return nil, err @@ -58,10 +94,25 @@ func NewDDLSink(ctx context.Context, sinkURI *url.URL) (*DDLSink, error) { d := &DDLSink{ id: changefeedID, storage: storage, +<<<<<<< HEAD statistics: metrics.NewStatistics(ctx, sink.TxnSink), lastSendCheckpointTsTime: time.Now(), } +======= + statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + cfg: cfg, + lastSendCheckpointTsTime: time.Now(), + } + + if err := d.initCron(ctx, sinkURI, cleanupJobs); err != nil { + return nil, errors.Trace(err) + } + // Note: It is intended to run the cleanup goroutine in the background. + // we don't wait for it to finish since the gourotine would be stuck if + // the downstream is abnormal, especially when the downstream is a nfs. + go d.bgCleanup(ctx) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) return d, nil } @@ -90,7 +141,11 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error } var def cloudstorage.TableDefinition +<<<<<<< HEAD def.FromDDLEvent(ddl) +======= + def.FromDDLEvent(ddl, d.cfg.OutputColumnID) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) if err := writeFile(def); err != nil { return errors.Trace(err) } @@ -98,7 +153,11 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error if ddl.Type == timodel.ActionExchangeTablePartition { // For exchange partition, we need to write the schema of the source table. var sourceTableDef cloudstorage.TableDefinition +<<<<<<< HEAD sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version) +======= + sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.cfg.OutputColumnID) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) return writeFile(sourceTableDef) } return nil @@ -117,6 +176,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, defer func() { d.lastSendCheckpointTsTime = time.Now() + d.lastCheckpointTs.Store(ts) }() ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts}) if err != nil { @@ -126,6 +186,118 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } +func (d *DDLSink) initCron( + ctx context.Context, sinkURI *url.URL, cleanupJobs []func(), +) (err error) { + if cleanupJobs == nil { + cleanupJobs = d.genCleanupJob(ctx, sinkURI) + } + + d.cron = cron.New() + for _, job := range cleanupJobs { + err = d.cron.AddFunc(d.cfg.FileCleanupCronSpec, job) + if err != nil { + return err + } + } + return nil +} + +func (d *DDLSink) bgCleanup(ctx context.Context) { + if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { + log.Info("skip cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) + return + } + + d.cron.Start() + defer d.cron.Stop() + log.Info("start schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) + + // wait for the context done + <-ctx.Done() + log.Info("stop schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Error(ctx.Err())) +} + +func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) []func() { + ret := []func(){} + + isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" + isRemoveEmptyDirsRuning := atomic.Bool{} + if isLocal { + ret = append(ret, func() { + if !isRemoveEmptyDirsRuning.CompareAndSwap(false, true) { + log.Warn("remove empty dirs is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + checkpointTs := d.lastCheckpointTs.Load() + start := time.Now() + cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path) + if err != nil { + log.Error("failed to remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + } + + isCleanupRunning := atomic.Bool{} + ret = append(ret, func() { + if !isCleanupRunning.CompareAndSwap(false, true) { + log.Warn("cleanup expired files is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + defer isCleanupRunning.Store(false) + start := time.Now() + checkpointTs := d.lastCheckpointTs.Load() + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs) + if err != nil { + log.Error("failed to remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + return ret +} + // Close closes the sink. func (d *DDLSink) Close() { if d.statistics != nil { diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 396c21eb188..221ff7ed59a 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -19,6 +19,7 @@ import ( "net/url" "os" "path" + "sync/atomic" "testing" "time" @@ -26,6 +27,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -33,10 +36,17 @@ func TestWriteDDLEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) +<<<<<<< HEAD sink, err := NewDDLSink(ctx, sinkURI) +======= + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) require.Nil(t, err) ddlEvent := &model.DDLEvent{ @@ -97,10 +107,17 @@ func TestWriteCheckpointTs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) +<<<<<<< HEAD sink, err := NewDDLSink(ctx, sinkURI) +======= + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) require.Nil(t, err) tables := []*model.TableInfo{ { @@ -132,3 +149,34 @@ func TestWriteCheckpointTs(t *testing.T) { require.Nil(t, err) require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) } + +func TestCleanupExpiredFiles(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + + cnt := atomic.Int64{} + cleanupJobs := []func(){ + func() { + cnt.Add(1) + }, + } + sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs) + require.Nil(t, err) + + _ = sink + time.Sleep(3 * time.Second) + require.LessOrEqual(t, int64(1), cnt.Load()) +} diff --git a/go.mod b/go.mod index c2c8eb4a0cd..b4bb934d4c9 100644 --- a/go.mod +++ b/go.mod @@ -69,8 +69,14 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 +<<<<<<< HEAD github.com/segmentio/kafka-go v0.4.39-0.20230217181906-f6986fb02ee7 github.com/shirou/gopsutil/v3 v3.23.3 +======= + github.com/robfig/cron v1.2.0 + github.com/segmentio/kafka-go v0.4.41-0.20230526171612-f057b1d369cd + github.com/shirou/gopsutil/v3 v3.23.5 +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) github.com/shopspring/decimal v1.3.0 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.6.1 diff --git a/go.sum b/go.sum index bce4b089b9b..b2e39c04481 100644 --- a/go.sum +++ b/go.sum @@ -983,6 +983,13 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +<<<<<<< HEAD +======= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 4eb50ff215b..0d4b498f12a 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -248,6 +248,22 @@ func (d *DateSeparator) FromString(separator string) error { return nil } +// GetPattern returns the pattern of the date separator. +func (d DateSeparator) GetPattern() string { + switch d { + case DateSeparatorNone: + return "" + case DateSeparatorYear: + return `\d{4}` + case DateSeparatorMonth: + return `\d{4}-\d{2}` + case DateSeparatorDay: + return `\d{4}-\d{2}-\d{2}` + default: + return "" + } +} + func (d DateSeparator) String() string { switch d { case DateSeparatorNone: @@ -364,6 +380,13 @@ type CloudStorageConfig struct { WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` +<<<<<<< HEAD +======= + + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 6590996168a..56c1522dc6f 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -47,6 +47,12 @@ const ( minFileSize = 1024 * 1024 // the upper limit of file size maxFileSize = 512 * 1024 * 1024 + + // disable file cleanup by default + defaultFileExpirationDays = 0 + // Second | Minute | Hour | Dom | Month | DowOptional + // `0 0 2 * * ?` means 2:00:00 AM every day + defaultFileCleanupCronSpec = "0 0 2 * * *" ) type urlConfig struct { @@ -62,15 +68,19 @@ type Config struct { FileSize int FileIndexWidth int DateSeparator string + FileExpirationDays int + FileCleanupCronSpec string EnablePartitionSeparator bool } // NewConfig returns the default cloud storage sink config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - FlushInterval: defaultFlushInterval, - FileSize: defaultFileSize, + WorkerCount: defaultWorkerCount, + FlushInterval: defaultFlushInterval, + FileSize: defaultFileSize, + FileExpirationDays: defaultFileExpirationDays, + FileCleanupCronSpec: defaultFileCleanupCronSpec, } } @@ -110,9 +120,24 @@ func (c *Config) Apply( return err } +<<<<<<< HEAD c.DateSeparator = replicaConfig.Sink.DateSeparator c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth +======= + c.DateSeparator = util.GetOrZero(replicaConfig.Sink.DateSeparator) + c.EnablePartitionSeparator = util.GetOrZero(replicaConfig.Sink.EnablePartitionSeparator) + c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) + if replicaConfig.Sink.CloudStorageConfig != nil { + c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil { + c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays + } + if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { + c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec + } + } +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 448a6217dfc..9b1af8a0814 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -17,10 +17,14 @@ import ( "context" "fmt" "io" + "io/fs" + "os" "path" + "path/filepath" "regexp" "strconv" "strings" + "time" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" @@ -29,6 +33,8 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/util" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -402,3 +408,70 @@ func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, err return fileIdx, nil } + +var dateSeparatorDayRegexp *regexp.Regexp + +// RemoveExpiredFiles removes expired files from external storage. +func RemoveExpiredFiles( + ctx context.Context, + _ model.ChangeFeedID, + storage storage.ExternalStorage, + cfg *Config, + checkpointTs model.Ts, +) (uint64, error) { + if cfg.DateSeparator != config.DateSeparatorDay.String() { + return 0, nil + } + if dateSeparatorDayRegexp == nil { + dateSeparatorDayRegexp = regexp.MustCompile(config.DateSeparatorDay.GetPattern()) + } + + ttl := time.Duration(cfg.FileExpirationDays) * time.Hour * 24 + currTime := oracle.GetTimeFromTS(checkpointTs).Add(-ttl) + expiredDate := currTime.Format("2006-01-02") + + cnt := uint64(0) + err := util.RemoveFilesIf(ctx, storage, func(path string) bool { + // the path is like: /////CDC{num}.extension + match := dateSeparatorDayRegexp.FindString(path) + if match != "" && match < expiredDate { + cnt++ + return true + } + return false + }, nil) + return cnt, err +} + +// RemoveEmptyDirs removes empty directories from external storage. +func RemoveEmptyDirs( + ctx context.Context, + id model.ChangeFeedID, + target string, +) (uint64, error) { + cnt := uint64(0) + err := filepath.Walk(target, func(path string, info fs.FileInfo, err error) error { + if os.IsNotExist(err) || path == target || info == nil { + // if path not exists, we should return nil to continue. + return nil + } + if err != nil { + return err + } + if info.IsDir() { + files, err := os.ReadDir(path) + if err == nil && len(files) == 0 { + log.Debug("Deleting empty directory", + zap.String("namespace", id.Namespace), + zap.String("changeFeedID", id.ID), + zap.String("path", path)) + os.Remove(path) + cnt++ + return filepath.SkipDir + } + } + return nil + }) + + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 6a0dbe56c4a..cffe5e3257d 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FilePathGenerator { @@ -275,3 +276,140 @@ func TestIsSchemaFile(t *testing.T) { "testCase: %s, path: %v", tt.name, tt.path) } } +<<<<<<< HEAD +======= + +func TestCheckOrWriteSchema(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + + var columns []*timodel.ColumnInfo + ft := types.NewFieldType(mysql.TypeLong) + ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) + col := &timodel.ColumnInfo{ + Name: timodel.NewCIStr("Id"), + FieldType: *ft, + DefaultValue: 10, + } + columns = append(columns, col) + tableInfo := &model.TableInfo{ + TableInfo: &timodel.TableInfo{Columns: columns}, + Version: 100, + TableName: model.TableName{ + Schema: "test", + Table: "table1", + TableID: 20, + }, + } + + table := VersionedTableName{ + TableNameWithPhysicTableID: tableInfo.TableName, + TableInfoVersion: tableInfo.Version, + } + + err := f.CheckOrWriteSchema(ctx, table, tableInfo) + require.NoError(t, err) + require.Equal(t, tableInfo.Version, f.versionMap[table]) + + // test only table version changed, schema file should be reused + table.TableInfoVersion = 101 + err = f.CheckOrWriteSchema(ctx, table, tableInfo) + require.NoError(t, err) + require.Equal(t, tableInfo.Version, f.versionMap[table]) + + dir = filepath.Join(dir, "test/table1/meta") + cnt, err := os.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 1, len(cnt)) +} + +func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) + storage, err := util.GetExternalStorageFromURI(ctx, uri) + require.NoError(t, err) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.DateSeparator = util.AddressOf(config.DateSeparatorDay.String()) + replicaConfig.Sink.Protocol = util.AddressOf(config.ProtocolCsv.String()) + replicaConfig.Sink.FileIndexWidth = util.AddressOf(6) + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + cfg := NewConfig() + err = cfg.Apply(ctx, sinkURI, replicaConfig) + require.NoError(t, err) + + // generate some expired files + filesWithoutPartition := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-01/CDC000001.csv", + "schema1/table1/5/2021-01-01/CDC000002.csv", + "schema1/table1/5/2021-01-01/CDC000003.csv", + "schema1/table1/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma1-table2 + "schema1/table2/5/2021-01-01/CDC000001.csv", + "schema1/table2/5/2021-01-01/CDC000002.csv", + "schema1/table2/5/2021-01-01/CDC000003.csv", + "schema1/table2/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table2/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithoutPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesWithPartition := []string{ + // schma1-table1 + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000001.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000002.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000003.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma2-table1 + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000001.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000002.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000003.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema2/table1/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesNotExpired := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-02/CDC000001.csv", + "schema1/table1/5/2021-01-02/CDC000002.csv", + "schema1/table1/5/2021-01-02/CDC000003.csv", + "schema1/table1/5/2021-01-02/" + defaultIndexFileName, // index + // schma1-table2 + "schema1/table2/5/2021-01-02/CDC000001.csv", + "schema1/table2/5/2021-01-02/CDC000002.csv", + "schema1/table2/5/2021-01-02/CDC000003.csv", + "schema1/table2/5/2021-01-02/" + defaultIndexFileName, // index + } + for _, file := range filesNotExpired { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) + checkpointTs := oracle.GoTimeToTS(currTime) + cnt, err := RemoveExpiredFiles(ctx, model.ChangeFeedID{}, storage, cfg, checkpointTs) + require.NoError(t, err) + require.Equal(t, uint64(16), cnt) +} +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 3c4e69d6f05..34ec47fbd82 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -259,38 +259,38 @@ func RemoveFilesIf( } log.Debug("Removing files", zap.Any("toRemoveFiles", toRemoveFiles)) - - for _, path := range toRemoveFiles { - if err := extStorage.DeleteFile(ctx, path); err != nil { - return errors.ErrExternalStorageAPI.Wrap(err) - } - } return DeleteFilesInExtStorage(ctx, extStorage, toRemoveFiles) } // DeleteFilesInExtStorage deletes files in external storage concurrently. +// TODO: Add a test for this function to cover batch delete. func DeleteFilesInExtStorage( ctx context.Context, extStorage storage.ExternalStorage, toRemoveFiles []string, ) error { limit := make(chan struct{}, 32) + batch := 3000 eg, egCtx := errgroup.WithContext(ctx) - for _, file := range toRemoveFiles { + for len(toRemoveFiles) > 0 { select { case <-egCtx.Done(): return egCtx.Err() case limit <- struct{}{}: } - name := file + if len(toRemoveFiles) < batch { + batch = len(toRemoveFiles) + } + files := toRemoveFiles[:batch] eg.Go(func() error { defer func() { <-limit }() - err := extStorage.DeleteFile(egCtx, name) + err := extStorage.DeleteFiles(egCtx, files) if err != nil && !IsNotExistInExtStorage(err) { // if fail then retry, may end up with notExit err, ignore the error return errors.ErrExternalStorageAPI.Wrap(err) } return nil }) + toRemoveFiles = toRemoveFiles[batch:] } return eg.Wait() } diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 5c714f70351..0e34a98d1aa 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -18,8 +18,14 @@ kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics canal_json_handle_key_only open_protocol_handle_key_only" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" +<<<<<<< HEAD storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table storage_csv_update" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table lossy_ddl" +======= +storage_only="lossy_ddl storage_csv_update" +storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" +storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table" +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) # Define groups # Note: If new group is added, the group name must also be added to CI diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml new file mode 100644 index 00000000000..74f1f934a8a --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml @@ -0,0 +1,16 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed.toml b/tests/integration_tests/storage_cleanup/conf/changefeed.toml new file mode 100644 index 00000000000..60663c5d577 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed.toml @@ -0,0 +1,22 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true + +[sink.cloud-storage-config] +file-expiration-days = 1 +# Second | Minute | Hour | Dom | Month | DowOptional +# cleanup every second +file-cleanup-cron-spec = "* * * * * *" \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/conf/diff_config.toml b/tests/integration_tests/storage_cleanup/conf/diff_config.toml new file mode 100644 index 00000000000..4ff72cef8f9 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/storage_cleanup/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/storage_cleanup/data/data.sql b/tests/integration_tests/storage_cleanup/data/data.sql new file mode 100644 index 00000000000..cec3db763d5 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/data.sql @@ -0,0 +1,84 @@ +use `test`; +-- make sure `nullable` can be handled properly. +INSERT INTO multi_data_type() VALUES (); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -1, 1, -129, 129, -65536, 65536, -16777216, 16777216, -2147483649, 2147483649 + , true, 123.456, 123.123, 123456789012.123456789012 + , '测', '测试', x'89504E470D0A1A0A', x'89504E470D0A1A0A', '测试tinytext', '测试text', '测试mediumtext', '测试longtext' + , 'tinyblob', 'blob', 'mediumblob', 'longblob' + , '1977-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59', 2022 + , 'enum2', 1 + , 'a,b', NULL); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -2, 2, -130, 130, -65537, 65537, -16777217, 16777217, -2147483650, 2147483650 + , false, 123.4567, 123.1237, 123456789012.1234567890127 + , '2', '测试2', x'89504E470D0A1A0B', x'89504E470D0A1A0B', '测试2tinytext', '测试2text', '测试2mediumtext', '测试longtext' + , 'tinyblob2', 'blob2', 'mediumblob2', 'longblob2' + , '2021-01-01', '2021-12-31 23:59:59', '19731230153000', '22:59:59', 2021 + , 'enum1', 2 + , 'a,b,c', '{ + "id": 1, + "name": "hello" + }'); + +UPDATE multi_data_type +SET t_boolean = false +WHERE id = 1; + +DELETE +FROM multi_data_type +WHERE id = 3; + +INSERT INTO multi_charset +VALUES (1, '测试', "中国", "上海", "你好,世界" + , 0xC4E3BAC3CAC0BDE7); + +INSERT INTO multi_charset +VALUES (2, '部署', "美国", "纽约", "世界,你好" + , 0xCAC0BDE7C4E3BAC3); + +UPDATE multi_charset +SET name = '开发' +WHERE name = '测试'; + +DELETE FROM multi_charset +WHERE name = '部署' + AND country = '美国' + AND city = '纽约' + AND description = '世界,你好'; + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); diff --git a/tests/integration_tests/storage_cleanup/data/schema.sql b/tests/integration_tests/storage_cleanup/data/schema.sql new file mode 100644 index 00000000000..d6934693e4e --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/schema.sql @@ -0,0 +1,64 @@ +USE `test`; + +CREATE TABLE multi_data_type +( + id INT AUTO_INCREMENT, + t_tinyint TINYINT, + t_tinyint_unsigned TINYINT UNSIGNED, + t_smallint SMALLINT, + t_smallint_unsigned SMALLINT UNSIGNED, + t_mediumint MEDIUMINT, + t_mediumint_unsigned MEDIUMINT UNSIGNED, + t_int INT, + t_int_unsigned INT UNSIGNED, + t_bigint BIGINT, + t_bigint_unsigned BIGINT UNSIGNED, + t_boolean BOOLEAN, + t_float FLOAT(6, 2), + t_double DOUBLE(6, 2), + t_decimal DECIMAL(38, 19), + t_char CHAR, + t_varchar VARCHAR(10), + c_binary binary(16), + c_varbinary varbinary(16), + t_tinytext TINYTEXT, + t_text TEXT, + t_mediumtext MEDIUMTEXT, + t_longtext LONGTEXT, + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, + t_year YEAR, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), + t_bit BIT(64), + t_json JSON, + PRIMARY KEY (id) +); + +CREATE TABLE multi_charset ( + id INT, + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob, + PRIMARY KEY (id) +) ENGINE = InnoDB CHARSET = utf8mb4; + +CREATE TABLE binary_columns +( + id INT AUTO_INCREMENT, + c_binary binary(255), + c_varbinary varbinary(255), + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + PRIMARY KEY (id) +); \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh new file mode 100644 index 00000000000..0e724226820 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -0,0 +1,120 @@ +#!/bin/bash + +set -eux + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +EXIST_FILES=() +CLEANED_FILES=() +function generate_single_table_files() { + local workdir=$1 + local bucket=$2 + local schema=$3 + local table=$4 + local day=$5 + local file_cnt=$6 + local should_clean=$7 # true or false + + table_dir=$workdir/$bucket/$schema/$table/$day + mkdir -p $table_dir + for i in $(seq 1 $file_cnt); do + file=$table_dir/$i.data + touch $file + if [ "$should_clean" == "true" ]; then + CLEANED_FILES+=($file) + else + EXIST_FILES+=($file) + fi + done + + mkdir -p $table_dir/meta + touch $table_dir/meta/CDC.index +} + +function generate_historic_files() { + local target_bucket="storage_test" + yesterday=$(date -d "yesterday" +"%Y-%m-%d") # should not be cleaned since file-expiration-days is 1 + day_before_yesterday=$(date -d "2 days ago" +"%Y-%m-%d") # should be cleaned + + # historic files of table in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type $yesterday 10 false + generate_single_table_files $WORK_DIR $target_bucket test multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns $day_before_yesterday 10 true + + # historic files of tables in test but not in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy $yesterday 10 false + + # historic files of table belongs to different schema + generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns $yesterday 10 false + + # historic files in default bucket, which should not be cleaned + generate_single_table_files $WORK_DIR storage_test_default test multi_data_type 2022-01-01 10 false + generate_single_table_files $WORK_DIR storage_test_default test multi_charset 2022-01-02 10 false + generate_single_table_files $WORK_DIR storage_test_default test binary_columns 2022-01-03 10 false +} + +function check_file_exists() { + local all_should_exist=$1 + for f in ${EXIST_FILES[@]}; do + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + done + + for f in ${CLEANED_FILES[@]}; do + if [ "$all_should_exist" == "true" ]; then + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + else + if [ -f $f ]; then + echo "file $f should not exist but exists" + exit 1 + fi + fi + done +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + generate_historic_files + + SINK_URI_DEFAULT="file://$WORK_DIR/storage_test_default?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI_DEFAULT" -c "default-config-test" --config=$CUR/conf/changefeed-default.toml + sleep 20 + check_file_exists true + + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + sleep 20 + check_file_exists false + + run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"