Skip to content

Commit

Permalink
cleanup expired files by day
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 22, 2023
1 parent 68dc49c commit 600cdb8
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 36 deletions.
30 changes: 18 additions & 12 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var cloudStorageConfig *config.CloudStorageConfig
if c.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
}
}

Expand Down Expand Up @@ -711,10 +713,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var cloudStorageConfig *CloudStorageConfig
if cloned.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
}
}

Expand Down Expand Up @@ -1195,10 +1199,12 @@ type MySQLConfig struct {

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
100 changes: 93 additions & 7 deletions cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ import (
"context"
"encoding/json"
"net/url"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/util"
"github.com/robfig/cron"
"go.uber.org/zap"
)

Expand All @@ -43,8 +45,9 @@ type DDLSink struct {
// statistic is used to record the DDL metrics
statistics *metrics.Statistics
storage storage.ExternalStorage
cfg *cloudstorage.Config

outputColumnID bool
lastCheckpointTs atomic.Uint64
lastSendCheckpointTsTime time.Time
}

Expand All @@ -54,6 +57,22 @@ func NewDDLSink(ctx context.Context,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
) (*DDLSink, error) {
return newDDLSink(ctx, changefeedID, sinkURI, replicaConfig, nil)
}

func newDDLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
cleanupJob func(),
) (*DDLSink, error) {
// create cloud storage config and then apply the params of sinkURI to it.
cfg := cloudstorage.NewConfig()
err := cfg.Apply(ctx, sinkURI, replicaConfig)
if err != nil {
return nil, err
}

storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String())
if err != nil {
return nil, err
Expand All @@ -63,13 +82,17 @@ func NewDDLSink(ctx context.Context,
id: changefeedID,
storage: storage,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink),
cfg: cfg,
lastSendCheckpointTsTime: time.Now(),
}

if replicaConfig != nil && replicaConfig.Sink.CloudStorageConfig != nil {
d.outputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID)
// Note: It is intended to run the cleanup goroutine in the background.
// we don't wait for it to finish since the gourotine would be stuck if
// the downstream is abnormal, especially when the downstream is a nfs.
if cleanupJob == nil {
cleanupJob = d.runCleanup(ctx)
}

go d.bgCleanup(ctx, cleanupJob)
return d, nil
}

Expand Down Expand Up @@ -98,15 +121,15 @@ func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
}

var def cloudstorage.TableDefinition
def.FromDDLEvent(ddl, d.outputColumnID)
def.FromDDLEvent(ddl, d.cfg.OutputColumnID)
if err := writeFile(def); err != nil {
return errors.Trace(err)
}

if ddl.Type == timodel.ActionExchangeTablePartition {
// For exchange partition, we need to write the schema of the source table.
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.outputColumnID)
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.cfg.OutputColumnID)
return writeFile(sourceTableDef)
}
return nil
Expand All @@ -125,6 +148,7 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context,

defer func() {
d.lastSendCheckpointTsTime = time.Now()
d.lastCheckpointTs.Store(ts)
}()
ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts})
if err != nil {
Expand All @@ -134,6 +158,68 @@ func (d *DDLSink) WriteCheckpointTs(ctx context.Context,
return errors.Trace(err)
}

func (d *DDLSink) bgCleanup(ctx context.Context, cleanupJob func()) {
if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 {
log.Info("skip cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.String("date-separator", d.cfg.DateSeparator),
zap.Int("expired-file-ttl", d.cfg.FileExpirationDays))
return
}

clenupCron := cron.New()
clenupCron.AddFunc(d.cfg.FileCleanupCronSpec, cleanupJob)
clenupCron.Start()
defer clenupCron.Stop()
log.Info("start schedule cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.String("date-separator", d.cfg.DateSeparator),
zap.Int("expired-file-ttl", d.cfg.FileExpirationDays))

// wait for the context done
<-ctx.Done()
log.Info("stop schedule cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Error(ctx.Err()))
}

func (d *DDLSink) runCleanup(ctx context.Context) func() {
isCleanupRunning := atomic.Bool{}
return func() {
if !isCleanupRunning.CompareAndSwap(false, true) {
log.Warn("cleanup expired files is already running, skip this round",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID))
return
}

defer isCleanupRunning.Store(false)
start := time.Now()
checkpointTs := d.lastCheckpointTs.Load()
cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.storage, d.cfg, checkpointTs)
if err != nil {
log.Error("failed to remove expired files",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
return
}

log.Info("remove expired files",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
}
}

// Close closes the sink.
func (d *DDLSink) Close() {
if d.statistics != nil {
Expand Down
45 changes: 41 additions & 4 deletions cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

func TestWriteDDLEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s", parentDir)
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, nil)
replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig)
require.Nil(t, err)

ddlEvent := &model.DDLEvent{
Expand Down Expand Up @@ -97,10 +102,13 @@ func TestWriteCheckpointTs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s", parentDir)
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, nil)
replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig)
require.Nil(t, err)
tables := []*model.TableInfo{
{
Expand Down Expand Up @@ -132,3 +140,32 @@ func TestWriteCheckpointTs(t *testing.T) {
require.Nil(t, err)
require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata))
}

func TestCleanupExpiredFiles(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{
FileExpirationDays: util.AddressOf(1),
FileCleanupCronSpec: util.AddressOf("* * * * * *"),
}
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.Nil(t, err)

cnt := 0
cleanupJob := func() {
cnt++
}
sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJob)
require.Nil(t, err)

_ = sink
time.Sleep(3 * time.Second)
require.LessOrEqual(t, 1, cnt)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ require (
github.com/jfcg/sixb v1.3.8 // indirect
github.com/jfcg/sorty/v2 v2.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down
20 changes: 19 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,22 @@ func (d *DateSeparator) FromString(separator string) error {
return nil
}

// GetPattern returns the pattern of the date separator.
func (d DateSeparator) GetPattern() string {
switch d {
case DateSeparatorNone:
return ""
case DateSeparatorYear:
return `\d{4}`
case DateSeparatorMonth:
return `\d{4}-\d{2}`
case DateSeparatorDay:
return `\d{4}-\d{2}-\d{2}`
default:
return ""
}
}

func (d DateSeparator) String() string {
switch d {
case DateSeparatorNone:
Expand Down Expand Up @@ -577,7 +593,9 @@ type CloudStorageConfig struct {
FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"`
FileSize *int `toml:"file-size" json:"file-size,omitempty"`

OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
Expand Down
22 changes: 19 additions & 3 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const (
minFileSize = 1024 * 1024
// the upper limit of file size
maxFileSize = 512 * 1024 * 1024

// disable file cleanup by default
defaultFileExpirationDays = 0
// Second | Minute | Hour | Dom | Month | DowOptional
// `0 0 2 * * ?` means 2:00:00 AM every day
defaultFileCleanupCronSpec = "0 0 2 * * *"
)

type urlConfig struct {
Expand All @@ -63,16 +69,20 @@ type Config struct {
FileSize int
FileIndexWidth int
DateSeparator string
FileExpirationDays int
FileCleanupCronSpec string
EnablePartitionSeparator bool
OutputColumnID bool
}

// NewConfig returns the default cloud storage sink config.
func NewConfig() *Config {
return &Config{
WorkerCount: defaultWorkerCount,
FlushInterval: defaultFlushInterval,
FileSize: defaultFileSize,
WorkerCount: defaultWorkerCount,
FlushInterval: defaultFlushInterval,
FileSize: defaultFileSize,
FileExpirationDays: defaultFileExpirationDays,
FileCleanupCronSpec: defaultFileCleanupCronSpec,
}
}

Expand Down Expand Up @@ -117,6 +127,12 @@ func (c *Config) Apply(
c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth)
if replicaConfig.Sink.CloudStorageConfig != nil {
c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID)
if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil {
c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays
}
if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil {
c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec
}
}

if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {
Expand Down
Loading

0 comments on commit 600cdb8

Please sign in to comment.