diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 267e5b6cef0..83dff1266d3 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -336,6 +336,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( LargeMessageHandle: largeMessageHandle, } } + + if c.Sink.CloudStorageConfig != nil { + res.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + } + } } if c.Mounter != nil { res.Mounter = &config.MounterConfig{ @@ -465,6 +477,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { LargeMessageHandle: largeMessageHandle, } } + + if cloned.Sink.CloudStorageConfig != nil { + res.Sink.CloudStorageConfig = &CloudStorageConfig{ + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + } + } } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ @@ -586,19 +610,20 @@ type Table struct { // SinkConfig represents sink config for a changefeed // This is a duplicate of config.SinkConfig type SinkConfig struct { - Protocol string `json:"protocol"` - SchemaRegistry string `json:"schema_registry"` - CSVConfig *CSVConfig `json:"csv"` - DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` - ColumnSelectors []*ColumnSelector `json:"column_selectors"` - TxnAtomicity string `json:"transaction_atomicity"` - EncoderConcurrency int `json:"encoder_concurrency"` - Terminator string `json:"terminator"` - DateSeparator string `json:"date_separator"` - EnablePartitionSeparator bool `json:"enable_partition_separator"` - FileIndexWidth int `json:"file_index_width"` - KafkaConfig *KafkaConfig `json:"kafka_config"` - AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"` + Protocol string `json:"protocol"` + SchemaRegistry string `json:"schema_registry"` + CSVConfig *CSVConfig `json:"csv"` + DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` + ColumnSelectors []*ColumnSelector `json:"column_selectors"` + TxnAtomicity string `json:"transaction_atomicity"` + EncoderConcurrency int `json:"encoder_concurrency"` + Terminator string `json:"terminator"` + DateSeparator string `json:"date_separator"` + EnablePartitionSeparator bool `json:"enable_partition_separator"` + FileIndexWidth int `json:"file_index_width"` + KafkaConfig *KafkaConfig `json:"kafka_config"` + CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` + AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"` } // KafkaConfig represents kafka config for a changefeed. @@ -615,6 +640,17 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` } +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` +} + // CSVConfig denotes the csv config // This is the same as config.CSVConfig type CSVConfig struct { diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 712147aa418..b359d26d1b0 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -326,12 +326,14 @@ func TestGCAndCleanup(t *testing.T) { cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) - m.Cleanup(ctx) - ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID)) + clenupCtx, clenupCancel := context.WithCancel(context.Background()) + defer clenupCancel() + m.Cleanup(clenupCtx) + ret, err := extStorage.FileExists(clenupCtx, getDeletedChangefeedMarker(changefeedID)) require.NoError(t, err) require.True(t, ret) cnt := 0 - extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + extStorage.WalkDir(clenupCtx, nil, func(path string, size int64) error { cnt++ return nil }) diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 0d8f24bd5d7..9a2bf1789a8 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -17,9 +17,9 @@ import ( "context" "encoding/json" "net/url" + "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" timodel "github.com/pingcap/tidb/parser/model" @@ -27,45 +27,78 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink" "github.com/pingcap/tiflow/cdc/sinkv2/metrics" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/util" + "github.com/robfig/cron" "go.uber.org/zap" ) // Assert DDLEventSink implementation -var _ ddlsink.DDLEventSink = (*ddlSink)(nil) +var _ ddlsink.DDLEventSink = (*DDLSink)(nil) -type ddlSink struct { +// DDLSink is a sink that writes ddl events to cloud storage. +type DDLSink struct { // id indicates which changefeed this sink belongs to. id model.ChangeFeedID // statistic is used to record the DDL metrics statistics *metrics.Statistics storage storage.ExternalStorage + cfg *cloudstorage.Config + cron *cron.Cron + lastCheckpointTs atomic.Uint64 lastSendCheckpointTsTime time.Time } -// NewCloudStorageDDLSink creates a ddl sink for cloud storage. -func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, error) { +// NewDDLSink creates a ddl sink for cloud storage. +func NewDDLSink(ctx context.Context, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, +) (*DDLSink, error) { + return newDDLSink(ctx, sinkURI, replicaConfig, nil) +} + +func newDDLSink(ctx context.Context, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + cleanupJobs []func(), /* only for test */ +) (*DDLSink, error) { + // create cloud storage config and then apply the params of sinkURI to it. + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, replicaConfig) + if err != nil { + return nil, errors.Trace(err) + } + storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String()) if err != nil { return nil, err } changefeedID := contextutil.ChangefeedIDFromCtx(ctx) - d := &ddlSink{ + d := &DDLSink{ id: changefeedID, storage: storage, statistics: metrics.NewStatistics(ctx, sink.TxnSink), + cfg: cfg, lastSendCheckpointTsTime: time.Now(), } + if err := d.initCron(ctx, sinkURI, cleanupJobs); err != nil { + return nil, errors.Trace(err) + } + // Note: It is intended to run the cleanup goroutine in the background. + // we don't wait for it to finish since the gourotine would be stuck if + // the downstream is abnormal, especially when the downstream is a nfs. + go d.bgCleanup(ctx) return d, nil } // WriteDDLEvent writes the ddl event to the cloud storage. -func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { +func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { writeFile := func(def cloudstorage.TableDefinition) error { encodedDef, err := def.MarshalWithQuery() if err != nil { @@ -103,7 +136,8 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error return nil } -func (d *ddlSink) WriteCheckpointTs(ctx context.Context, +// WriteCheckpointTs writes a checkpoint timestamp to the sink. +func (d *DDLSink) WriteCheckpointTs(ctx context.Context, ts uint64, tables []*model.TableInfo, ) error { if time.Since(d.lastSendCheckpointTsTime) < 2*time.Second { @@ -115,6 +149,7 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context, defer func() { d.lastSendCheckpointTsTime = time.Now() + d.lastCheckpointTs.Store(ts) }() ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts}) if err != nil { @@ -124,7 +159,120 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } -func (d *ddlSink) Close() error { +func (d *DDLSink) initCron( + ctx context.Context, sinkURI *url.URL, cleanupJobs []func(), +) (err error) { + if cleanupJobs == nil { + cleanupJobs = d.genCleanupJob(ctx, sinkURI) + } + + d.cron = cron.New() + for _, job := range cleanupJobs { + err = d.cron.AddFunc(d.cfg.FileCleanupCronSpec, job) + if err != nil { + return err + } + } + return nil +} + +func (d *DDLSink) bgCleanup(ctx context.Context) { + if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { + log.Info("skip cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) + return + } + + d.cron.Start() + defer d.cron.Stop() + log.Info("start schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) + + // wait for the context done + <-ctx.Done() + log.Info("stop schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Error(ctx.Err())) +} + +func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) []func() { + ret := []func(){} + + isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" + isRemoveEmptyDirsRuning := atomic.Bool{} + if isLocal { + ret = append(ret, func() { + if !isRemoveEmptyDirsRuning.CompareAndSwap(false, true) { + log.Warn("remove empty dirs is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + checkpointTs := d.lastCheckpointTs.Load() + start := time.Now() + cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path) + if err != nil { + log.Error("failed to remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + } + + isCleanupRunning := atomic.Bool{} + ret = append(ret, func() { + if !isCleanupRunning.CompareAndSwap(false, true) { + log.Warn("cleanup expired files is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + defer isCleanupRunning.Store(false) + start := time.Now() + checkpointTs := d.lastCheckpointTs.Load() + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs) + if err != nil { + log.Error("failed to remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + return ret +} + +// Close closes the sink. +func (d *DDLSink) Close() error { if d.statistics != nil { d.statistics.Close() } diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 23c10e83bea..77488d4c917 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -19,6 +19,7 @@ import ( "net/url" "os" "path" + "sync/atomic" "testing" "time" @@ -26,6 +27,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -33,10 +36,13 @@ func TestWriteDDLEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) - sink, err := NewCloudStorageDDLSink(ctx, sinkURI) + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, sinkURI, replicaConfig) require.Nil(t, err) ddlEvent := &model.DDLEvent{ @@ -97,10 +103,13 @@ func TestWriteCheckpointTs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) - sink, err := NewCloudStorageDDLSink(ctx, sinkURI) + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, sinkURI, replicaConfig) require.Nil(t, err) tables := []*model.TableInfo{ { @@ -132,3 +141,34 @@ func TestWriteCheckpointTs(t *testing.T) { require.Nil(t, err) require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) } + +func TestCleanupExpiredFiles(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + + cnt := atomic.Int64{} + cleanupJobs := []func(){ + func() { + cnt.Add(1) + }, + } + sink, err := newDDLSink(ctx, sinkURI, replicaConfig, cleanupJobs) + require.Nil(t, err) + + _ = sink + time.Sleep(3 * time.Second) + require.LessOrEqual(t, int64(1), cnt.Load()) +} diff --git a/cdc/sinkv2/ddlsink/factory/factory.go b/cdc/sinkv2/ddlsink/factory/factory.go index 701301aeb89..b9451b2abd9 100644 --- a/cdc/sinkv2/ddlsink/factory/factory.go +++ b/cdc/sinkv2/ddlsink/factory/factory.go @@ -49,7 +49,7 @@ func New( case sink.MySQLSSLScheme, sink.MySQLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: return mysql.NewMySQLDDLSink(ctx, sinkURI, cfg) case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: - return cloudstorage.NewCloudStorageDDLSink(ctx, sinkURI) + return cloudstorage.NewDDLSink(ctx, sinkURI, cfg) default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) diff --git a/go.mod b/go.mod index 6020e112874..0f1ed91d680 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,8 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 - github.com/shirou/gopsutil/v3 v3.23.1 + github.com/robfig/cron v1.2.0 + github.com/shirou/gopsutil/v3 v3.23.5 github.com/shopspring/decimal v1.3.0 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.6.1 @@ -241,6 +242,7 @@ require ( github.com/rivo/uniseg v0.4.2 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/cors v1.7.0 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect @@ -266,7 +268,7 @@ require ( github.com/xdg/stringprep v1.0.3 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xitongsys/parquet-go v1.6.0 // indirect - github.com/yusufpapurcu/wmi v1.2.2 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/v2 v2.305.4 // indirect go.opencensus.io v0.24.0 // indirect diff --git a/go.sum b/go.sum index 67dffa10fa5..2089a4dd2e4 100644 --- a/go.sum +++ b/go.sum @@ -1110,6 +1110,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1137,8 +1139,12 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/shhdgit/testfixtures/v3 v3.6.2-0.20211219171712-c4f264d673d3/go.mod h1:Z0OLtuFJ7Y4yLsVijHK8uq95NjGFlYJy+I00ElAEtUQ= github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= -github.com/shirou/gopsutil/v3 v3.23.1 h1:a9KKO+kGLKEvcPIs4W62v0nu3sciVDOOOPUD0Hz7z/4= -github.com/shirou/gopsutil/v3 v3.23.1/go.mod h1:NN6mnm5/0k8jw4cBfCnJtr5L7ErOTg18tMNpgFkn0hA= +github.com/shirou/gopsutil/v3 v3.23.5 h1:5SgDCeQ0KW0S4N0znjeM/eFHXXOKyv2dVNgRq/c9P6Y= +github.com/shirou/gopsutil/v3 v3.23.5/go.mod h1:Ng3Maa27Q2KARVJ0SPZF5NdrQSC3XHKP8IIWrHgMeLY= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v1.3.0 h1:KK3gWIXskZ2O1U/JNTisNcvH+jveJxZYrjbTsrbbnh8= github.com/shopspring/decimal v1.3.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= @@ -1318,8 +1324,9 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1704,8 +1711,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220909162455-aba9fc2a8ff2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index a796f8372ac..9f73988dd32 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -131,7 +131,8 @@ type SinkConfig struct { EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"` FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"` - KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` + KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` + CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` // TiDBSourceID is the source ID of the upstream TiDB, // which is used to set the `tidb_cdc_write_source` session variable. @@ -253,6 +254,22 @@ func (d *DateSeparator) FromString(separator string) error { return nil } +// GetPattern returns the pattern of the date separator. +func (d DateSeparator) GetPattern() string { + switch d { + case DateSeparatorNone: + return "" + case DateSeparatorYear: + return `\d{4}` + case DateSeparatorMonth: + return `\d{4}-\d{2}` + case DateSeparatorDay: + return `\d{4}-\d{2}-\d{2}` + default: + return "" + } +} + func (d DateSeparator) String() string { switch d { case DateSeparatorNone: @@ -542,3 +559,15 @@ func (c *LargeMessageHandleConfig) Disabled() bool { } return c.LargeMessageHandleOption == LargeMessageHandleOptionNone } + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` + FileSize *int `toml:"file-size" json:"file-size,omitempty"` + + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` +} diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 75cd00e186b..918058bdc91 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" psink "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -39,12 +40,24 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size minFileSize = 1024 * 1024 // the upper limit of file size maxFileSize = 512 * 1024 * 1024 + + // disable file cleanup by default + defaultFileExpirationDays = 0 + // Second | Minute | Hour | Dom | Month | DowOptional + // `0 0 2 * * ?` means 2:00:00 AM every day + defaultFileCleanupCronSpec = "0 0 2 * * *" ) // Config is the configuration for cloud storage sink. @@ -54,15 +67,21 @@ type Config struct { FileSize int FileIndexWidth int DateSeparator string + FileExpirationDays int + FileCleanupCronSpec string EnablePartitionSeparator bool + OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - FlushInterval: defaultFlushInterval, - FileSize: defaultFileSize, + WorkerCount: defaultWorkerCount, + FlushInterval: defaultFlushInterval, + FileSize: defaultFileSize, + FileExpirationDays: defaultFileExpirationDays, + FileCleanupCronSpec: defaultFileCleanupCronSpec, } } @@ -98,10 +117,23 @@ func (c *Config) Apply( c.DateSeparator = replicaConfig.Sink.DateSeparator c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth + if replicaConfig.Sink.CloudStorageConfig != nil { + c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) + if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil { + c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays + } + if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { + c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec + } + } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 994219c74b5..906131965f6 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -31,6 +31,7 @@ func TestConfigApply(t *testing.T) { expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorNone.String() expected.EnablePartitionSeparator = true + expected.FlushConcurrency = 1 uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err) diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 6ca2844a762..d2ea2545a82 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -17,10 +17,14 @@ import ( "context" "fmt" "io" + "io/fs" + "os" "path" + "path/filepath" "regexp" "strconv" "strings" + "time" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" @@ -29,6 +33,8 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/util" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -403,3 +409,70 @@ func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, err return fileIdx, nil } + +var dateSeparatorDayRegexp *regexp.Regexp + +// RemoveExpiredFiles removes expired files from external storage. +func RemoveExpiredFiles( + ctx context.Context, + _ model.ChangeFeedID, + storage storage.ExternalStorage, + cfg *Config, + checkpointTs model.Ts, +) (uint64, error) { + if cfg.DateSeparator != config.DateSeparatorDay.String() { + return 0, nil + } + if dateSeparatorDayRegexp == nil { + dateSeparatorDayRegexp = regexp.MustCompile(config.DateSeparatorDay.GetPattern()) + } + + ttl := time.Duration(cfg.FileExpirationDays) * time.Hour * 24 + currTime := oracle.GetTimeFromTS(checkpointTs).Add(-ttl) + expiredDate := currTime.Format("2006-01-02") + + cnt := uint64(0) + err := util.RemoveFilesIf(ctx, storage, func(path string) bool { + // the path is like: /////CDC{num}.extension + match := dateSeparatorDayRegexp.FindString(path) + if match != "" && match < expiredDate { + cnt++ + return true + } + return false + }, nil) + return cnt, err +} + +// RemoveEmptyDirs removes empty directories from external storage. +func RemoveEmptyDirs( + ctx context.Context, + id model.ChangeFeedID, + target string, +) (uint64, error) { + cnt := uint64(0) + err := filepath.Walk(target, func(path string, info fs.FileInfo, err error) error { + if os.IsNotExist(err) || path == target || info == nil { + // if path not exists, we should return nil to continue. + return nil + } + if err != nil { + return err + } + if info.IsDir() { + files, err := os.ReadDir(path) + if err == nil && len(files) == 0 { + log.Debug("Deleting empty directory", + zap.String("namespace", id.Namespace), + zap.String("changeFeedID", id.ID), + zap.String("path", path)) + os.Remove(path) + cnt++ + return filepath.SkipDir + } + } + return nil + }) + + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index ead5254fff4..03a5ba55bc7 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FilePathGenerator { @@ -328,3 +329,89 @@ func TestCheckOrWriteSchema(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(cnt)) } + +func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) + storage, err := util.GetExternalStorageFromURI(ctx, uri) + require.NoError(t, err) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String() + replicaConfig.Sink.Protocol = config.ProtocolCsv.String() + replicaConfig.Sink.FileIndexWidth = 6 + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + cfg := NewConfig() + err = cfg.Apply(ctx, sinkURI, replicaConfig) + require.NoError(t, err) + + // generate some expired files + filesWithoutPartition := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-01/CDC000001.csv", + "schema1/table1/5/2021-01-01/CDC000002.csv", + "schema1/table1/5/2021-01-01/CDC000003.csv", + "schema1/table1/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma1-table2 + "schema1/table2/5/2021-01-01/CDC000001.csv", + "schema1/table2/5/2021-01-01/CDC000002.csv", + "schema1/table2/5/2021-01-01/CDC000003.csv", + "schema1/table2/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table2/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithoutPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesWithPartition := []string{ + // schma1-table1 + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000001.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000002.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000003.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma2-table1 + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000001.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000002.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000003.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema2/table1/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesNotExpired := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-02/CDC000001.csv", + "schema1/table1/5/2021-01-02/CDC000002.csv", + "schema1/table1/5/2021-01-02/CDC000003.csv", + "schema1/table1/5/2021-01-02/" + defaultIndexFileName, // index + // schma1-table2 + "schema1/table2/5/2021-01-02/CDC000001.csv", + "schema1/table2/5/2021-01-02/CDC000002.csv", + "schema1/table2/5/2021-01-02/CDC000003.csv", + "schema1/table2/5/2021-01-02/" + defaultIndexFileName, // index + } + for _, file := range filesNotExpired { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) + checkpointTs := oracle.GoTimeToTS(currTime) + cnt, err := RemoveExpiredFiles(ctx, model.ChangeFeedID{}, storage, cfg, checkpointTs) + require.NoError(t, err) + require.Equal(t, uint64(16), cnt) +} diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 3c4e69d6f05..a092cd7be2f 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -259,38 +259,40 @@ func RemoveFilesIf( } log.Debug("Removing files", zap.Any("toRemoveFiles", toRemoveFiles)) - - for _, path := range toRemoveFiles { - if err := extStorage.DeleteFile(ctx, path); err != nil { - return errors.ErrExternalStorageAPI.Wrap(err) - } - } return DeleteFilesInExtStorage(ctx, extStorage, toRemoveFiles) } // DeleteFilesInExtStorage deletes files in external storage concurrently. +// TODO: Add a test for this function to cover batch delete. func DeleteFilesInExtStorage( ctx context.Context, extStorage storage.ExternalStorage, toRemoveFiles []string, ) error { limit := make(chan struct{}, 32) + batch := 3000 eg, egCtx := errgroup.WithContext(ctx) - for _, file := range toRemoveFiles { + for len(toRemoveFiles) > 0 { select { case <-egCtx.Done(): return egCtx.Err() case limit <- struct{}{}: } - name := file + if len(toRemoveFiles) < batch { + batch = len(toRemoveFiles) + } + files := toRemoveFiles[:batch] eg.Go(func() error { defer func() { <-limit }() - err := extStorage.DeleteFile(egCtx, name) - if err != nil && !IsNotExistInExtStorage(err) { - // if fail then retry, may end up with notExit err, ignore the error - return errors.ErrExternalStorageAPI.Wrap(err) + for _, file := range files { + err := extStorage.DeleteFile(egCtx, file) + if err != nil && !IsNotExistInExtStorage(err) { + // if fail then retry, may end up with notExit err, ignore the error + return errors.ErrExternalStorageAPI.Wrap(err) + } } return nil }) + toRemoveFiles = toRemoveFiles[batch:] } return eg.Wait() } diff --git a/pkg/util/pointer.go b/pkg/util/pointer.go new file mode 100644 index 00000000000..c047548b88f --- /dev/null +++ b/pkg/util/pointer.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +// GetOrZero returns the value pointed to by p, or a zero value of +// its type if p is nil. +func GetOrZero[T any](p *T) T { + var val T + if p == nil { + return val + } + return *p +} + +// AddressOf return the address of the given input variable. +func AddressOf[T any](v T) *T { return &v } diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index f17ca2477f7..f92be287dee 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -19,7 +19,7 @@ kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic canal_jso kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" storage_only="lossy_ddl storage_csv_update" -storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" +storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table" # Define groups diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml new file mode 100644 index 00000000000..74f1f934a8a --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml @@ -0,0 +1,16 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed.toml b/tests/integration_tests/storage_cleanup/conf/changefeed.toml new file mode 100644 index 00000000000..60663c5d577 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed.toml @@ -0,0 +1,22 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true + +[sink.cloud-storage-config] +file-expiration-days = 1 +# Second | Minute | Hour | Dom | Month | DowOptional +# cleanup every second +file-cleanup-cron-spec = "* * * * * *" \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/conf/diff_config.toml b/tests/integration_tests/storage_cleanup/conf/diff_config.toml new file mode 100644 index 00000000000..4ff72cef8f9 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/storage_cleanup/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/storage_cleanup/data/data.sql b/tests/integration_tests/storage_cleanup/data/data.sql new file mode 100644 index 00000000000..cec3db763d5 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/data.sql @@ -0,0 +1,84 @@ +use `test`; +-- make sure `nullable` can be handled properly. +INSERT INTO multi_data_type() VALUES (); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -1, 1, -129, 129, -65536, 65536, -16777216, 16777216, -2147483649, 2147483649 + , true, 123.456, 123.123, 123456789012.123456789012 + , '测', '测试', x'89504E470D0A1A0A', x'89504E470D0A1A0A', '测试tinytext', '测试text', '测试mediumtext', '测试longtext' + , 'tinyblob', 'blob', 'mediumblob', 'longblob' + , '1977-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59', 2022 + , 'enum2', 1 + , 'a,b', NULL); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -2, 2, -130, 130, -65537, 65537, -16777217, 16777217, -2147483650, 2147483650 + , false, 123.4567, 123.1237, 123456789012.1234567890127 + , '2', '测试2', x'89504E470D0A1A0B', x'89504E470D0A1A0B', '测试2tinytext', '测试2text', '测试2mediumtext', '测试longtext' + , 'tinyblob2', 'blob2', 'mediumblob2', 'longblob2' + , '2021-01-01', '2021-12-31 23:59:59', '19731230153000', '22:59:59', 2021 + , 'enum1', 2 + , 'a,b,c', '{ + "id": 1, + "name": "hello" + }'); + +UPDATE multi_data_type +SET t_boolean = false +WHERE id = 1; + +DELETE +FROM multi_data_type +WHERE id = 3; + +INSERT INTO multi_charset +VALUES (1, '测试', "中国", "上海", "你好,世界" + , 0xC4E3BAC3CAC0BDE7); + +INSERT INTO multi_charset +VALUES (2, '部署', "美国", "纽约", "世界,你好" + , 0xCAC0BDE7C4E3BAC3); + +UPDATE multi_charset +SET name = '开发' +WHERE name = '测试'; + +DELETE FROM multi_charset +WHERE name = '部署' + AND country = '美国' + AND city = '纽约' + AND description = '世界,你好'; + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); diff --git a/tests/integration_tests/storage_cleanup/data/schema.sql b/tests/integration_tests/storage_cleanup/data/schema.sql new file mode 100644 index 00000000000..d6934693e4e --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/schema.sql @@ -0,0 +1,64 @@ +USE `test`; + +CREATE TABLE multi_data_type +( + id INT AUTO_INCREMENT, + t_tinyint TINYINT, + t_tinyint_unsigned TINYINT UNSIGNED, + t_smallint SMALLINT, + t_smallint_unsigned SMALLINT UNSIGNED, + t_mediumint MEDIUMINT, + t_mediumint_unsigned MEDIUMINT UNSIGNED, + t_int INT, + t_int_unsigned INT UNSIGNED, + t_bigint BIGINT, + t_bigint_unsigned BIGINT UNSIGNED, + t_boolean BOOLEAN, + t_float FLOAT(6, 2), + t_double DOUBLE(6, 2), + t_decimal DECIMAL(38, 19), + t_char CHAR, + t_varchar VARCHAR(10), + c_binary binary(16), + c_varbinary varbinary(16), + t_tinytext TINYTEXT, + t_text TEXT, + t_mediumtext MEDIUMTEXT, + t_longtext LONGTEXT, + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, + t_year YEAR, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), + t_bit BIT(64), + t_json JSON, + PRIMARY KEY (id) +); + +CREATE TABLE multi_charset ( + id INT, + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob, + PRIMARY KEY (id) +) ENGINE = InnoDB CHARSET = utf8mb4; + +CREATE TABLE binary_columns +( + id INT AUTO_INCREMENT, + c_binary binary(255), + c_varbinary varbinary(255), + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + PRIMARY KEY (id) +); \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh new file mode 100644 index 00000000000..0e724226820 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -0,0 +1,120 @@ +#!/bin/bash + +set -eux + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +EXIST_FILES=() +CLEANED_FILES=() +function generate_single_table_files() { + local workdir=$1 + local bucket=$2 + local schema=$3 + local table=$4 + local day=$5 + local file_cnt=$6 + local should_clean=$7 # true or false + + table_dir=$workdir/$bucket/$schema/$table/$day + mkdir -p $table_dir + for i in $(seq 1 $file_cnt); do + file=$table_dir/$i.data + touch $file + if [ "$should_clean" == "true" ]; then + CLEANED_FILES+=($file) + else + EXIST_FILES+=($file) + fi + done + + mkdir -p $table_dir/meta + touch $table_dir/meta/CDC.index +} + +function generate_historic_files() { + local target_bucket="storage_test" + yesterday=$(date -d "yesterday" +"%Y-%m-%d") # should not be cleaned since file-expiration-days is 1 + day_before_yesterday=$(date -d "2 days ago" +"%Y-%m-%d") # should be cleaned + + # historic files of table in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type $yesterday 10 false + generate_single_table_files $WORK_DIR $target_bucket test multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns $day_before_yesterday 10 true + + # historic files of tables in test but not in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy $yesterday 10 false + + # historic files of table belongs to different schema + generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns $yesterday 10 false + + # historic files in default bucket, which should not be cleaned + generate_single_table_files $WORK_DIR storage_test_default test multi_data_type 2022-01-01 10 false + generate_single_table_files $WORK_DIR storage_test_default test multi_charset 2022-01-02 10 false + generate_single_table_files $WORK_DIR storage_test_default test binary_columns 2022-01-03 10 false +} + +function check_file_exists() { + local all_should_exist=$1 + for f in ${EXIST_FILES[@]}; do + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + done + + for f in ${CLEANED_FILES[@]}; do + if [ "$all_should_exist" == "true" ]; then + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + else + if [ -f $f ]; then + echo "file $f should not exist but exists" + exit 1 + fi + fi + done +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + generate_historic_files + + SINK_URI_DEFAULT="file://$WORK_DIR/storage_test_default?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI_DEFAULT" -c "default-config-test" --config=$CUR/conf/changefeed-default.toml + sleep 20 + check_file_exists true + + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + sleep 20 + check_file_exists false + + run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"