diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 267e5b6cef0..6fa402001f0 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -336,6 +336,70 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( LargeMessageHandle: largeMessageHandle, } } +<<<<<<< HEAD +======= + var mysqlConfig *config.MySQLConfig + if c.Sink.MySQLConfig != nil { + mysqlConfig = &config.MySQLConfig{ + WorkerCount: c.Sink.MySQLConfig.WorkerCount, + MaxTxnRow: c.Sink.MySQLConfig.MaxTxnRow, + MaxMultiUpdateRowSize: c.Sink.MySQLConfig.MaxMultiUpdateRowSize, + MaxMultiUpdateRowCount: c.Sink.MySQLConfig.MaxMultiUpdateRowCount, + TiDBTxnMode: c.Sink.MySQLConfig.TiDBTxnMode, + SSLCa: c.Sink.MySQLConfig.SSLCa, + SSLCert: c.Sink.MySQLConfig.SSLCert, + SSLKey: c.Sink.MySQLConfig.SSLKey, + TimeZone: c.Sink.MySQLConfig.TimeZone, + WriteTimeout: c.Sink.MySQLConfig.WriteTimeout, + ReadTimeout: c.Sink.MySQLConfig.ReadTimeout, + Timeout: c.Sink.MySQLConfig.Timeout, + EnableBatchDML: c.Sink.MySQLConfig.EnableBatchDML, + EnableMultiStatement: c.Sink.MySQLConfig.EnableMultiStatement, + EnableCachePreparedStatement: c.Sink.MySQLConfig.EnableCachePreparedStatement, + } + } + var cloudStorageConfig *config.CloudStorageConfig + if c.Sink.CloudStorageConfig != nil { + cloudStorageConfig = &config.CloudStorageConfig{ + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + } + } + + res.Sink = &config.SinkConfig{ + DispatchRules: dispatchRules, + Protocol: c.Sink.Protocol, + CSVConfig: csvConfig, + ColumnSelectors: columnSelectors, + SchemaRegistry: c.Sink.SchemaRegistry, + EncoderConcurrency: c.Sink.EncoderConcurrency, + Terminator: c.Sink.Terminator, + DateSeparator: c.Sink.DateSeparator, + EnablePartitionSeparator: c.Sink.EnablePartitionSeparator, + FileIndexWidth: c.Sink.FileIndexWidth, + EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns, + DeleteOnlyOutputHandleKeyColumns: c.Sink.DeleteOnlyOutputHandleKeyColumns, + ContentCompatible: c.Sink.ContentCompatible, + KafkaConfig: kafkaConfig, + MySQLConfig: mysqlConfig, + PulsarConfig: pulsarConfig, + CloudStorageConfig: cloudStorageConfig, + SafeMode: c.Sink.SafeMode, + } + + if c.Sink.TxnAtomicity != nil { + res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity)) + } + if c.Sink.AdvanceTimeoutInSec != nil { + res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec) + } + +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) } if c.Mounter != nil { res.Mounter = &config.MounterConfig{ @@ -464,6 +528,139 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience, LargeMessageHandle: largeMessageHandle, } +<<<<<<< HEAD +======= + + kafkaConfig = &KafkaConfig{ + PartitionNum: cloned.Sink.KafkaConfig.PartitionNum, + ReplicationFactor: cloned.Sink.KafkaConfig.ReplicationFactor, + KafkaVersion: cloned.Sink.KafkaConfig.KafkaVersion, + MaxMessageBytes: cloned.Sink.KafkaConfig.MaxMessageBytes, + Compression: cloned.Sink.KafkaConfig.Compression, + KafkaClientID: cloned.Sink.KafkaConfig.KafkaClientID, + AutoCreateTopic: cloned.Sink.KafkaConfig.AutoCreateTopic, + DialTimeout: cloned.Sink.KafkaConfig.DialTimeout, + WriteTimeout: cloned.Sink.KafkaConfig.WriteTimeout, + ReadTimeout: cloned.Sink.KafkaConfig.ReadTimeout, + RequiredAcks: cloned.Sink.KafkaConfig.RequiredAcks, + SASLUser: cloned.Sink.KafkaConfig.SASLUser, + SASLPassword: cloned.Sink.KafkaConfig.SASLPassword, + SASLMechanism: cloned.Sink.KafkaConfig.SASLMechanism, + SASLGssAPIAuthType: cloned.Sink.KafkaConfig.SASLGssAPIAuthType, + SASLGssAPIKeytabPath: cloned.Sink.KafkaConfig.SASLGssAPIKeytabPath, + SASLGssAPIKerberosConfigPath: cloned.Sink.KafkaConfig.SASLGssAPIKerberosConfigPath, + SASLGssAPIServiceName: cloned.Sink.KafkaConfig.SASLGssAPIServiceName, + SASLGssAPIUser: cloned.Sink.KafkaConfig.SASLGssAPIUser, + SASLGssAPIPassword: cloned.Sink.KafkaConfig.SASLGssAPIPassword, + SASLGssAPIRealm: cloned.Sink.KafkaConfig.SASLGssAPIRealm, + SASLGssAPIDisablePafxfast: cloned.Sink.KafkaConfig.SASLGssAPIDisablePafxfast, + SASLOAuthClientID: cloned.Sink.KafkaConfig.SASLOAuthClientID, + SASLOAuthClientSecret: cloned.Sink.KafkaConfig.SASLOAuthClientSecret, + SASLOAuthTokenURL: cloned.Sink.KafkaConfig.SASLOAuthTokenURL, + SASLOAuthScopes: cloned.Sink.KafkaConfig.SASLOAuthScopes, + SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType, + SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience, + EnableTLS: cloned.Sink.KafkaConfig.EnableTLS, + CA: cloned.Sink.KafkaConfig.CA, + Cert: cloned.Sink.KafkaConfig.Cert, + Key: cloned.Sink.KafkaConfig.Key, + InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify, + CodecConfig: codeConfig, + LargeMessageHandle: largeMessageHandle, + GlueSchemaRegistryConfig: glueSchemaRegistryConfig, + } + } + var mysqlConfig *MySQLConfig + if cloned.Sink.MySQLConfig != nil { + mysqlConfig = &MySQLConfig{ + WorkerCount: cloned.Sink.MySQLConfig.WorkerCount, + MaxTxnRow: cloned.Sink.MySQLConfig.MaxTxnRow, + MaxMultiUpdateRowSize: cloned.Sink.MySQLConfig.MaxMultiUpdateRowSize, + MaxMultiUpdateRowCount: cloned.Sink.MySQLConfig.MaxMultiUpdateRowCount, + TiDBTxnMode: cloned.Sink.MySQLConfig.TiDBTxnMode, + SSLCa: cloned.Sink.MySQLConfig.SSLCa, + SSLCert: cloned.Sink.MySQLConfig.SSLCert, + SSLKey: cloned.Sink.MySQLConfig.SSLKey, + TimeZone: cloned.Sink.MySQLConfig.TimeZone, + WriteTimeout: cloned.Sink.MySQLConfig.WriteTimeout, + ReadTimeout: cloned.Sink.MySQLConfig.ReadTimeout, + Timeout: cloned.Sink.MySQLConfig.Timeout, + EnableBatchDML: cloned.Sink.MySQLConfig.EnableBatchDML, + EnableMultiStatement: cloned.Sink.MySQLConfig.EnableMultiStatement, + EnableCachePreparedStatement: cloned.Sink.MySQLConfig.EnableCachePreparedStatement, + } + } + var pulsarConfig *PulsarConfig + if cloned.Sink.PulsarConfig != nil { + pulsarConfig = &PulsarConfig{ + TLSKeyFilePath: cloned.Sink.PulsarConfig.TLSKeyFilePath, + TLSCertificateFile: cloned.Sink.PulsarConfig.TLSCertificateFile, + TLSTrustCertsFilePath: cloned.Sink.PulsarConfig.TLSTrustCertsFilePath, + PulsarProducerCacheSize: cloned.Sink.PulsarConfig.PulsarProducerCacheSize, + PulsarVersion: cloned.Sink.PulsarConfig.PulsarVersion, + CompressionType: (*string)(cloned.Sink.PulsarConfig.CompressionType), + AuthenticationToken: cloned.Sink.PulsarConfig.AuthenticationToken, + ConnectionTimeout: (*int)(cloned.Sink.PulsarConfig.ConnectionTimeout), + OperationTimeout: (*int)(cloned.Sink.PulsarConfig.OperationTimeout), + BatchingMaxMessages: cloned.Sink.PulsarConfig.BatchingMaxMessages, + BatchingMaxPublishDelay: (*int)(cloned.Sink.PulsarConfig.BatchingMaxPublishDelay), + SendTimeout: (*int)(cloned.Sink.PulsarConfig.SendTimeout), + TokenFromFile: cloned.Sink.PulsarConfig.TokenFromFile, + BasicUserName: cloned.Sink.PulsarConfig.BasicUserName, + BasicPassword: cloned.Sink.PulsarConfig.BasicPassword, + AuthTLSCertificatePath: cloned.Sink.PulsarConfig.AuthTLSCertificatePath, + AuthTLSPrivateKeyPath: cloned.Sink.PulsarConfig.AuthTLSPrivateKeyPath, + } + if cloned.Sink.PulsarConfig.OAuth2 != nil { + pulsarConfig.OAuth2 = &PulsarOAuth2{ + OAuth2IssuerURL: cloned.Sink.PulsarConfig.OAuth2.OAuth2IssuerURL, + OAuth2Audience: cloned.Sink.PulsarConfig.OAuth2.OAuth2Audience, + OAuth2PrivateKey: cloned.Sink.PulsarConfig.OAuth2.OAuth2PrivateKey, + OAuth2ClientID: cloned.Sink.PulsarConfig.OAuth2.OAuth2ClientID, + OAuth2Scope: cloned.Sink.PulsarConfig.OAuth2.OAuth2Scope, + } + } + } + var cloudStorageConfig *CloudStorageConfig + if cloned.Sink.CloudStorageConfig != nil { + cloudStorageConfig = &CloudStorageConfig{ + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + } + } + + res.Sink = &SinkConfig{ + Protocol: cloned.Sink.Protocol, + SchemaRegistry: cloned.Sink.SchemaRegistry, + DispatchRules: dispatchRules, + CSVConfig: csvConfig, + ColumnSelectors: columnSelectors, + EncoderConcurrency: cloned.Sink.EncoderConcurrency, + Terminator: cloned.Sink.Terminator, + DateSeparator: cloned.Sink.DateSeparator, + EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator, + FileIndexWidth: cloned.Sink.FileIndexWidth, + EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns, + DeleteOnlyOutputHandleKeyColumns: cloned.Sink.DeleteOnlyOutputHandleKeyColumns, + ContentCompatible: cloned.Sink.ContentCompatible, + KafkaConfig: kafkaConfig, + MySQLConfig: mysqlConfig, + PulsarConfig: pulsarConfig, + CloudStorageConfig: cloudStorageConfig, + SafeMode: cloned.Sink.SafeMode, + } + + if cloned.Sink.TxnAtomicity != nil { + res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity)) + } + if cloned.Sink.AdvanceTimeoutInSec != nil { + res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) } } if cloned.Consistent != nil { @@ -785,6 +982,118 @@ type Capture struct { ClusterID string `json:"cluster_id"` } +<<<<<<< HEAD +======= +// CodecConfig represents a MQ codec configuration +type CodecConfig struct { + EnableTiDBExtension *bool `json:"enable_tidb_extension,omitempty"` + MaxBatchSize *int `json:"max_batch_size,omitempty"` + AvroEnableWatermark *bool `json:"avro_enable_watermark"` + AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"` + AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"` +} + +// PulsarConfig represents a pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `json:"tls-trust-certs-file-path,omitempty"` + PulsarProducerCacheSize *int32 `json:"pulsar-producer-cache-size,omitempty"` + PulsarVersion *string `json:"pulsar-version,omitempty"` + CompressionType *string `json:"compression-type,omitempty"` + AuthenticationToken *string `json:"authentication-token,omitempty"` + ConnectionTimeout *int `json:"connection-timeout,omitempty"` + OperationTimeout *int `json:"operation-timeout,omitempty"` + BatchingMaxMessages *uint `json:"batching-max-messages,omitempty"` + BatchingMaxPublishDelay *int `json:"batching-max-publish-delay,omitempty"` + SendTimeout *int `json:"send-timeout,omitempty"` + TokenFromFile *string `json:"token-from-file,omitempty"` + BasicUserName *string `json:"basic-user-name,omitempty"` + BasicPassword *string `json:"basic-password,omitempty"` + AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"` + AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"` + OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"` +} + +// PulsarOAuth2 is the configuration for OAuth2 +type PulsarOAuth2 struct { + OAuth2IssuerURL string `json:"oauth2-issuer-url,omitempty"` + OAuth2Audience string `json:"oauth2-audience,omitempty"` + OAuth2PrivateKey string `json:"oauth2-private-key,omitempty"` + OAuth2ClientID string `json:"oauth2-client-id,omitempty"` + OAuth2Scope string `json:"oauth2-scope,omitempty"` +} + +// KafkaConfig represents a kafka sink configuration +type KafkaConfig struct { + PartitionNum *int32 `json:"partition_num,omitempty"` + ReplicationFactor *int16 `json:"replication_factor,omitempty"` + KafkaVersion *string `json:"kafka_version,omitempty"` + MaxMessageBytes *int `json:"max_message_bytes,omitempty"` + Compression *string `json:"compression,omitempty"` + KafkaClientID *string `json:"kafka_client_id,omitempty"` + AutoCreateTopic *bool `json:"auto_create_topic,omitempty"` + DialTimeout *string `json:"dial_timeout,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty"` + RequiredAcks *int `json:"required_acks,omitempty"` + SASLUser *string `json:"sasl_user,omitempty"` + SASLPassword *string `json:"sasl_password,omitempty"` + SASLMechanism *string `json:"sasl_mechanism,omitempty"` + SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"` + SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"` + SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"` + SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"` + SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"` + SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"` + SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"` + SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"` + SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"` + SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"` + SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"` + SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` + SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` + EnableTLS *bool `json:"enable_tls,omitempty"` + CA *string `json:"ca,omitempty"` + Cert *string `json:"cert,omitempty"` + Key *string `json:"key,omitempty"` + InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` + CodecConfig *CodecConfig `json:"codec_config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` + GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"` +} + +// MySQLConfig represents a MySQL sink configuration +type MySQLConfig struct { + WorkerCount *int `json:"worker_count,omitempty"` + MaxTxnRow *int `json:"max_txn_row,omitempty"` + MaxMultiUpdateRowSize *int `json:"max_multi_update_row_size,omitempty"` + MaxMultiUpdateRowCount *int `json:"max_multi_update_row_count,omitempty"` + TiDBTxnMode *string `json:"tidb_txn_mode,omitempty"` + SSLCa *string `json:"ssl_ca,omitempty"` + SSLCert *string `json:"ssl_cert,omitempty"` + SSLKey *string `json:"ssl_key,omitempty"` + TimeZone *string `json:"time_zone,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty"` + Timeout *string `json:"timeout,omitempty"` + EnableBatchDML *bool `json:"enable_batch_dml,omitempty"` + EnableMultiStatement *bool `json:"enable_multi_statement,omitempty"` + EnableCachePreparedStatement *bool `json:"enable_cache_prepared_statement,omitempty"` +} + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` +} + +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) // ChangefeedStatus holds common information of a changefeed in cdc type ChangefeedStatus struct { State string `json:"state,omitempty"` diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 0d8f24bd5d7..35facc857b0 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -17,19 +17,27 @@ import ( "context" "encoding/json" "net/url" + "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink" "github.com/pingcap/tiflow/cdc/sinkv2/metrics" +======= + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/util" + "github.com/robfig/cron" "go.uber.org/zap" ) @@ -42,12 +50,43 @@ type ddlSink struct { // statistic is used to record the DDL metrics statistics *metrics.Statistics storage storage.ExternalStorage + cfg *cloudstorage.Config + cron *cron.Cron +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go lastSendCheckpointTsTime time.Time } // NewCloudStorageDDLSink creates a ddl sink for cloud storage. func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, error) { +======= + lastCheckpointTs atomic.Uint64 + lastSendCheckpointTsTime time.Time +} + +// NewDDLSink creates a ddl sink for cloud storage. +func NewDDLSink(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, +) (*DDLSink, error) { + return newDDLSink(ctx, changefeedID, sinkURI, replicaConfig, nil) +} + +func newDDLSink(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + cleanupJobs []func(), /* only for test */ +) (*DDLSink, error) { + // create cloud storage config and then apply the params of sinkURI to it. + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, replicaConfig) + if err != nil { + return nil, errors.Trace(err) + } + +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String()) if err != nil { return nil, err @@ -57,10 +96,25 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er d := &ddlSink{ id: changefeedID, storage: storage, +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go statistics: metrics.NewStatistics(ctx, sink.TxnSink), lastSendCheckpointTsTime: time.Now(), } +======= + statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + cfg: cfg, + lastSendCheckpointTsTime: time.Now(), + } + + if err := d.initCron(ctx, sinkURI, cleanupJobs); err != nil { + return nil, errors.Trace(err) + } + // Note: It is intended to run the cleanup goroutine in the background. + // we don't wait for it to finish since the gourotine would be stuck if + // the downstream is abnormal, especially when the downstream is a nfs. + go d.bgCleanup(ctx) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go return d, nil } @@ -89,7 +143,11 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error } var def cloudstorage.TableDefinition +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go def.FromDDLEvent(ddl) +======= + def.FromDDLEvent(ddl, d.cfg.OutputColumnID) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go if err := writeFile(def); err != nil { return errors.Trace(err) } @@ -97,7 +155,11 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error if ddl.Type == timodel.ActionExchangeTablePartition { // For exchange partition, we need to write the schema of the source table. var sourceTableDef cloudstorage.TableDefinition +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version) +======= + sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version, d.cfg.OutputColumnID) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go return writeFile(sourceTableDef) } return nil @@ -115,6 +177,7 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context, defer func() { d.lastSendCheckpointTsTime = time.Now() + d.lastCheckpointTs.Store(ts) }() ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts}) if err != nil { @@ -124,7 +187,124 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context, return errors.Trace(err) } +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go func (d *ddlSink) Close() error { +======= +func (d *DDLSink) initCron( + ctx context.Context, sinkURI *url.URL, cleanupJobs []func(), +) (err error) { + if cleanupJobs == nil { + cleanupJobs = d.genCleanupJob(ctx, sinkURI) + } + + d.cron = cron.New() + for _, job := range cleanupJobs { + err = d.cron.AddFunc(d.cfg.FileCleanupCronSpec, job) + if err != nil { + return err + } + } + return nil +} + +func (d *DDLSink) bgCleanup(ctx context.Context) { + if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 { + log.Info("skip cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) + return + } + + d.cron.Start() + defer d.cron.Stop() + log.Info("start schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.String("dateSeparator", d.cfg.DateSeparator), + zap.Int("expiredFileTTL", d.cfg.FileExpirationDays)) + + // wait for the context done + <-ctx.Done() + log.Info("stop schedule cleanup expired files for storage sink", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Error(ctx.Err())) +} + +func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) []func() { + ret := []func(){} + + isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" + isRemoveEmptyDirsRuning := atomic.Bool{} + if isLocal { + ret = append(ret, func() { + if !isRemoveEmptyDirsRuning.CompareAndSwap(false, true) { + log.Warn("remove empty dirs is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + checkpointTs := d.lastCheckpointTs.Load() + start := time.Now() + cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path) + if err != nil { + log.Error("failed to remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove empty dirs", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + } + + isCleanupRunning := atomic.Bool{} + ret = append(ret, func() { + if !isCleanupRunning.CompareAndSwap(false, true) { + log.Warn("cleanup expired files is already running, skip this round", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID)) + return + } + + defer isCleanupRunning.Store(false) + start := time.Now() + checkpointTs := d.lastCheckpointTs.Load() + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs) + if err != nil { + log.Error("failed to remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove expired files", + zap.String("namespace", d.id.Namespace), + zap.String("changefeedID", d.id.ID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + return ret +} + +// Close closes the sink. +func (d *DDLSink) Close() { +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go if d.statistics != nil { d.statistics.Close() } diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 23c10e83bea..564de18bc8b 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -19,6 +19,7 @@ import ( "net/url" "os" "path" + "sync/atomic" "testing" "time" @@ -26,6 +27,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -33,10 +36,17 @@ func TestWriteDDLEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go sink, err := NewCloudStorageDDLSink(ctx, sinkURI) +======= + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go require.Nil(t, err) ddlEvent := &model.DDLEvent{ @@ -97,10 +107,17 @@ func TestWriteCheckpointTs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s", parentDir) + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) sinkURI, err := url.Parse(uri) require.Nil(t, err) +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go sink, err := NewCloudStorageDDLSink(ctx, sinkURI) +======= + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig) +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go require.Nil(t, err) tables := []*model.TableInfo{ { @@ -132,3 +149,34 @@ func TestWriteCheckpointTs(t *testing.T) { require.Nil(t, err) require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) } + +func TestCleanupExpiredFiles(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.Nil(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.Nil(t, err) + + cnt := atomic.Int64{} + cleanupJobs := []func(){ + func() { + cnt.Add(1) + }, + } + sink, err := newDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, cleanupJobs) + require.Nil(t, err) + + _ = sink + time.Sleep(3 * time.Second) + require.LessOrEqual(t, int64(1), cnt.Load()) +} diff --git a/go.mod b/go.mod index 6020e112874..10435e08cee 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,13 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 +<<<<<<< HEAD github.com/shirou/gopsutil/v3 v3.23.1 +======= + github.com/robfig/cron v1.2.0 + github.com/segmentio/kafka-go v0.4.41-0.20230526171612-f057b1d369cd + github.com/shirou/gopsutil/v3 v3.23.5 +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) github.com/shopspring/decimal v1.3.0 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.6.1 diff --git a/go.sum b/go.sum index 67dffa10fa5..f0c41ed4b2e 100644 --- a/go.sum +++ b/go.sum @@ -1108,8 +1108,17 @@ github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa/go.mod h1:qq github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +<<<<<<< HEAD github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +======= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index a796f8372ac..236d05ef847 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -253,6 +253,22 @@ func (d *DateSeparator) FromString(separator string) error { return nil } +// GetPattern returns the pattern of the date separator. +func (d DateSeparator) GetPattern() string { + switch d { + case DateSeparatorNone: + return "" + case DateSeparatorYear: + return `\d{4}` + case DateSeparatorMonth: + return `\d{4}-\d{2}` + case DateSeparatorDay: + return `\d{4}-\d{2}-\d{2}` + default: + return "" + } +} + func (d DateSeparator) String() string { switch d { case DateSeparatorNone: @@ -302,6 +318,215 @@ func (k *KafkaConfig) MaskSensitiveData() { } } +<<<<<<< HEAD +======= +// PulsarCompressionType is the compression type for pulsar +type PulsarCompressionType string + +// Value returns the pulsar compression type +func (p *PulsarCompressionType) Value() pulsar.CompressionType { + if p == nil { + return 0 + } + switch strings.ToLower(string(*p)) { + case "lz4": + return pulsar.LZ4 + case "zlib": + return pulsar.ZLib + case "zstd": + return pulsar.ZSTD + default: + return 0 // default is no compression + } +} + +// TimeMill is the time in milliseconds +type TimeMill int + +// Duration returns the time in seconds as a duration +func (t *TimeMill) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Millisecond +} + +// NewTimeMill returns a new time in milliseconds +func NewTimeMill(x int) *TimeMill { + t := TimeMill(x) + return &t +} + +// TimeSec is the time in seconds +type TimeSec int + +// Duration returns the time in seconds as a duration +func (t *TimeSec) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Second +} + +// NewTimeSec returns a new time in seconds +func NewTimeSec(x int) *TimeSec { + t := TimeSec(x) + return &t +} + +// OAuth2 is the configuration for OAuth2 +type OAuth2 struct { + // OAuth2IssuerURL the URL of the authorization server. + OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"` + // OAuth2Audience the URL of the resource server. + OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"` + // OAuth2PrivateKey the private key used to sign the server. + OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"` + // OAuth2ClientID the client ID of the application. + OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"` + // OAuth2Scope scope + OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"` +} + +func (o *OAuth2) validate() (err error) { + if o == nil { + return nil + } + if len(o.OAuth2IssuerURL) == 0 || len(o.OAuth2ClientID) == 0 || len(o.OAuth2PrivateKey) == 0 || + len(o.OAuth2Audience) == 0 { + return fmt.Errorf("issuer-url and audience and private-key and client-id not be empty") + } + return nil +} + +// PulsarConfig pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` + + // PulsarProducerCacheSize is the size of the cache of pulsar producers + PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` + + // PulsarVersion print the version of pulsar + PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"` + + // pulsar client compression + CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"` + + // AuthenticationToken the token for the Pulsar server + AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"` + + // ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds) + ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"` + + // Set the operation timeout (default: 30 seconds) + // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the + // operation will be marked as failed + OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"` + + // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) + BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"` + + // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) + // if batch messages are enabled. If set to a non zero value, messages will be queued until this time + // interval or until + BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"` + + // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. + // Send and SendAsync returns an error after timeout. + // default: 30s + SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"` + + // TokenFromFile Authentication from the file token, + // the path name of the file (the third priority authentication method) + TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"` + + // BasicUserName Account name for pulsar basic authentication (the second priority authentication method) + BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"` + // BasicPassword with account + BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"` + + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"` + // AuthTLSPrivateKeyPath private key + AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"` + + // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id + // and 'type' always use 'client_credentials' + OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` + + // BrokerURL is used to configure service brokerUrl for the Pulsar service. + // This parameter is a part of the `sink-uri`. Internal use only. + BrokerURL string `toml:"-" json:"-"` + // SinkURI is the parsed sinkURI. Internal use only. + SinkURI *url.URL `toml:"-" json:"-"` +} + +// MaskSensitiveData masks sensitive data in PulsarConfig +func (c *PulsarConfig) MaskSensitiveData() { + if c.AuthenticationToken != nil { + c.AuthenticationToken = aws.String("******") + } + if c.BasicPassword != nil { + c.BasicPassword = aws.String("******") + } + if c.OAuth2 != nil { + c.OAuth2.OAuth2PrivateKey = "******" + } +} + +// Check get broker url +func (c *PulsarConfig) validate() (err error) { + if c.OAuth2 != nil { + if err = c.OAuth2.validate(); err != nil { + return err + } + if c.TLSTrustCertsFilePath == nil { + return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") + } + } + + return nil +} + +// GetDefaultTopicName get default topic name +func (c *PulsarConfig) GetDefaultTopicName() string { + topicName := c.SinkURI.Path + return topicName[1:] +} + +// MySQLConfig represents a MySQL sink configuration +type MySQLConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + MaxTxnRow *int `toml:"max-txn-row" json:"max-txn-row,omitempty"` + MaxMultiUpdateRowSize *int `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"` + MaxMultiUpdateRowCount *int `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"` + TiDBTxnMode *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"` + SSLCa *string `toml:"ssl-ca" json:"ssl-ca,omitempty"` + SSLCert *string `toml:"ssl-cert" json:"ssl-cert,omitempty"` + SSLKey *string `toml:"ssl-key" json:"ssl-key,omitempty"` + TimeZone *string `toml:"time-zone" json:"time-zone,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + Timeout *string `toml:"timeout" json:"timeout,omitempty"` + EnableBatchDML *bool `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"` + EnableMultiStatement *bool `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"` + EnableCachePreparedStatement *bool `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"` +} + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` + FileSize *int `toml:"file-size" json:"file-size,omitempty"` + + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` +} + +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 75cd00e186b..d436df48126 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -45,6 +45,12 @@ const ( minFileSize = 1024 * 1024 // the upper limit of file size maxFileSize = 512 * 1024 * 1024 + + // disable file cleanup by default + defaultFileExpirationDays = 0 + // Second | Minute | Hour | Dom | Month | DowOptional + // `0 0 2 * * ?` means 2:00:00 AM every day + defaultFileCleanupCronSpec = "0 0 2 * * *" ) // Config is the configuration for cloud storage sink. @@ -54,15 +60,19 @@ type Config struct { FileSize int FileIndexWidth int DateSeparator string + FileExpirationDays int + FileCleanupCronSpec string EnablePartitionSeparator bool } // NewConfig returns the default cloud storage sink config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - FlushInterval: defaultFlushInterval, - FileSize: defaultFileSize, + WorkerCount: defaultWorkerCount, + FlushInterval: defaultFlushInterval, + FileSize: defaultFileSize, + FileExpirationDays: defaultFileExpirationDays, + FileCleanupCronSpec: defaultFileCleanupCronSpec, } } @@ -95,9 +105,24 @@ func (c *Config) Apply( return err } +<<<<<<< HEAD c.DateSeparator = replicaConfig.Sink.DateSeparator c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth +======= + c.DateSeparator = util.GetOrZero(replicaConfig.Sink.DateSeparator) + c.EnablePartitionSeparator = util.GetOrZero(replicaConfig.Sink.EnablePartitionSeparator) + c.FileIndexWidth = util.GetOrZero(replicaConfig.Sink.FileIndexWidth) + if replicaConfig.Sink.CloudStorageConfig != nil { + c.OutputColumnID = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputColumnID) + if replicaConfig.Sink.CloudStorageConfig.FileExpirationDays != nil { + c.FileExpirationDays = *replicaConfig.Sink.CloudStorageConfig.FileExpirationDays + } + if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { + c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec + } + } +>>>>>>> dd3f8dfc25 (sink(ticdc): cleanup expired files by day for storage sink (#10097)) if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 6ca2844a762..d2ea2545a82 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -17,10 +17,14 @@ import ( "context" "fmt" "io" + "io/fs" + "os" "path" + "path/filepath" "regexp" "strconv" "strings" + "time" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" @@ -29,6 +33,8 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/util" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -403,3 +409,70 @@ func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, err return fileIdx, nil } + +var dateSeparatorDayRegexp *regexp.Regexp + +// RemoveExpiredFiles removes expired files from external storage. +func RemoveExpiredFiles( + ctx context.Context, + _ model.ChangeFeedID, + storage storage.ExternalStorage, + cfg *Config, + checkpointTs model.Ts, +) (uint64, error) { + if cfg.DateSeparator != config.DateSeparatorDay.String() { + return 0, nil + } + if dateSeparatorDayRegexp == nil { + dateSeparatorDayRegexp = regexp.MustCompile(config.DateSeparatorDay.GetPattern()) + } + + ttl := time.Duration(cfg.FileExpirationDays) * time.Hour * 24 + currTime := oracle.GetTimeFromTS(checkpointTs).Add(-ttl) + expiredDate := currTime.Format("2006-01-02") + + cnt := uint64(0) + err := util.RemoveFilesIf(ctx, storage, func(path string) bool { + // the path is like: /////CDC{num}.extension + match := dateSeparatorDayRegexp.FindString(path) + if match != "" && match < expiredDate { + cnt++ + return true + } + return false + }, nil) + return cnt, err +} + +// RemoveEmptyDirs removes empty directories from external storage. +func RemoveEmptyDirs( + ctx context.Context, + id model.ChangeFeedID, + target string, +) (uint64, error) { + cnt := uint64(0) + err := filepath.Walk(target, func(path string, info fs.FileInfo, err error) error { + if os.IsNotExist(err) || path == target || info == nil { + // if path not exists, we should return nil to continue. + return nil + } + if err != nil { + return err + } + if info.IsDir() { + files, err := os.ReadDir(path) + if err == nil && len(files) == 0 { + log.Debug("Deleting empty directory", + zap.String("namespace", id.Namespace), + zap.String("changeFeedID", id.ID), + zap.String("path", path)) + os.Remove(path) + cnt++ + return filepath.SkipDir + } + } + return nil + }) + + return cnt, err +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index ead5254fff4..e95fcc74f16 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FilePathGenerator { @@ -328,3 +329,89 @@ func TestCheckOrWriteSchema(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(cnt)) } + +func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) + storage, err := util.GetExternalStorageFromURI(ctx, uri) + require.NoError(t, err) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.DateSeparator = util.AddressOf(config.DateSeparatorDay.String()) + replicaConfig.Sink.Protocol = util.AddressOf(config.ProtocolCsv.String()) + replicaConfig.Sink.FileIndexWidth = util.AddressOf(6) + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + cfg := NewConfig() + err = cfg.Apply(ctx, sinkURI, replicaConfig) + require.NoError(t, err) + + // generate some expired files + filesWithoutPartition := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-01/CDC000001.csv", + "schema1/table1/5/2021-01-01/CDC000002.csv", + "schema1/table1/5/2021-01-01/CDC000003.csv", + "schema1/table1/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma1-table2 + "schema1/table2/5/2021-01-01/CDC000001.csv", + "schema1/table2/5/2021-01-01/CDC000002.csv", + "schema1/table2/5/2021-01-01/CDC000003.csv", + "schema1/table2/5/2021-01-01/" + defaultIndexFileName, // index + "schema1/table2/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithoutPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesWithPartition := []string{ + // schma1-table1 + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000001.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000002.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/CDC000003.csv", + "schema1/table1/400200133/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema1/table1/meta/schema_5_20210101.json", // schema should never be cleaned + // schma2-table1 + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000001.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000002.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/CDC000003.csv", + "schema2/table1/400200150/12/2021-01-01/20210101/" + defaultIndexFileName, // index + "schema2/table1/meta/schema_5_20210101.json", // schema should never be cleaned + } + for _, file := range filesWithPartition { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + filesNotExpired := []string{ + // schma1-table1 + "schema1/table1/5/2021-01-02/CDC000001.csv", + "schema1/table1/5/2021-01-02/CDC000002.csv", + "schema1/table1/5/2021-01-02/CDC000003.csv", + "schema1/table1/5/2021-01-02/" + defaultIndexFileName, // index + // schma1-table2 + "schema1/table2/5/2021-01-02/CDC000001.csv", + "schema1/table2/5/2021-01-02/CDC000002.csv", + "schema1/table2/5/2021-01-02/CDC000003.csv", + "schema1/table2/5/2021-01-02/" + defaultIndexFileName, // index + } + for _, file := range filesNotExpired { + err := storage.WriteFile(ctx, file, []byte("test")) + require.NoError(t, err) + } + + currTime := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC) + checkpointTs := oracle.GoTimeToTS(currTime) + cnt, err := RemoveExpiredFiles(ctx, model.ChangeFeedID{}, storage, cfg, checkpointTs) + require.NoError(t, err) + require.Equal(t, uint64(16), cnt) +} diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 3c4e69d6f05..34ec47fbd82 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -259,38 +259,38 @@ func RemoveFilesIf( } log.Debug("Removing files", zap.Any("toRemoveFiles", toRemoveFiles)) - - for _, path := range toRemoveFiles { - if err := extStorage.DeleteFile(ctx, path); err != nil { - return errors.ErrExternalStorageAPI.Wrap(err) - } - } return DeleteFilesInExtStorage(ctx, extStorage, toRemoveFiles) } // DeleteFilesInExtStorage deletes files in external storage concurrently. +// TODO: Add a test for this function to cover batch delete. func DeleteFilesInExtStorage( ctx context.Context, extStorage storage.ExternalStorage, toRemoveFiles []string, ) error { limit := make(chan struct{}, 32) + batch := 3000 eg, egCtx := errgroup.WithContext(ctx) - for _, file := range toRemoveFiles { + for len(toRemoveFiles) > 0 { select { case <-egCtx.Done(): return egCtx.Err() case limit <- struct{}{}: } - name := file + if len(toRemoveFiles) < batch { + batch = len(toRemoveFiles) + } + files := toRemoveFiles[:batch] eg.Go(func() error { defer func() { <-limit }() - err := extStorage.DeleteFile(egCtx, name) + err := extStorage.DeleteFiles(egCtx, files) if err != nil && !IsNotExistInExtStorage(err) { // if fail then retry, may end up with notExit err, ignore the error return errors.ErrExternalStorageAPI.Wrap(err) } return nil }) + toRemoveFiles = toRemoveFiles[batch:] } return eg.Wait() } diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index f17ca2477f7..f92be287dee 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -19,7 +19,7 @@ kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic canal_jso kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" storage_only="lossy_ddl storage_csv_update" -storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" +storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table" # Define groups diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml new file mode 100644 index 00000000000..74f1f934a8a --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed-default.toml @@ -0,0 +1,16 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true diff --git a/tests/integration_tests/storage_cleanup/conf/changefeed.toml b/tests/integration_tests/storage_cleanup/conf/changefeed.toml new file mode 100644 index 00000000000..60663c5d577 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/changefeed.toml @@ -0,0 +1,22 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true + +[sink.cloud-storage-config] +file-expiration-days = 1 +# Second | Minute | Hour | Dom | Month | DowOptional +# cleanup every second +file-cleanup-cron-spec = "* * * * * *" \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/conf/diff_config.toml b/tests/integration_tests/storage_cleanup/conf/diff_config.toml new file mode 100644 index 00000000000..4ff72cef8f9 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/storage_cleanup/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/storage_cleanup/data/data.sql b/tests/integration_tests/storage_cleanup/data/data.sql new file mode 100644 index 00000000000..cec3db763d5 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/data.sql @@ -0,0 +1,84 @@ +use `test`; +-- make sure `nullable` can be handled properly. +INSERT INTO multi_data_type() VALUES (); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -1, 1, -129, 129, -65536, 65536, -16777216, 16777216, -2147483649, 2147483649 + , true, 123.456, 123.123, 123456789012.123456789012 + , '测', '测试', x'89504E470D0A1A0A', x'89504E470D0A1A0A', '测试tinytext', '测试text', '测试mediumtext', '测试longtext' + , 'tinyblob', 'blob', 'mediumblob', 'longblob' + , '1977-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59', 2022 + , 'enum2', 1 + , 'a,b', NULL); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_enum, t_bit + , t_set, t_json) +VALUES ( -2, 2, -130, 130, -65537, 65537, -16777217, 16777217, -2147483650, 2147483650 + , false, 123.4567, 123.1237, 123456789012.1234567890127 + , '2', '测试2', x'89504E470D0A1A0B', x'89504E470D0A1A0B', '测试2tinytext', '测试2text', '测试2mediumtext', '测试longtext' + , 'tinyblob2', 'blob2', 'mediumblob2', 'longblob2' + , '2021-01-01', '2021-12-31 23:59:59', '19731230153000', '22:59:59', 2021 + , 'enum1', 2 + , 'a,b,c', '{ + "id": 1, + "name": "hello" + }'); + +UPDATE multi_data_type +SET t_boolean = false +WHERE id = 1; + +DELETE +FROM multi_data_type +WHERE id = 3; + +INSERT INTO multi_charset +VALUES (1, '测试', "中国", "上海", "你好,世界" + , 0xC4E3BAC3CAC0BDE7); + +INSERT INTO multi_charset +VALUES (2, '部署', "美国", "纽约", "世界,你好" + , 0xCAC0BDE7C4E3BAC3); + +UPDATE multi_charset +SET name = '开发' +WHERE name = '测试'; + +DELETE FROM multi_charset +WHERE name = '部署' + AND country = '美国' + AND city = '纽约' + AND description = '世界,你好'; + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); diff --git a/tests/integration_tests/storage_cleanup/data/schema.sql b/tests/integration_tests/storage_cleanup/data/schema.sql new file mode 100644 index 00000000000..d6934693e4e --- /dev/null +++ b/tests/integration_tests/storage_cleanup/data/schema.sql @@ -0,0 +1,64 @@ +USE `test`; + +CREATE TABLE multi_data_type +( + id INT AUTO_INCREMENT, + t_tinyint TINYINT, + t_tinyint_unsigned TINYINT UNSIGNED, + t_smallint SMALLINT, + t_smallint_unsigned SMALLINT UNSIGNED, + t_mediumint MEDIUMINT, + t_mediumint_unsigned MEDIUMINT UNSIGNED, + t_int INT, + t_int_unsigned INT UNSIGNED, + t_bigint BIGINT, + t_bigint_unsigned BIGINT UNSIGNED, + t_boolean BOOLEAN, + t_float FLOAT(6, 2), + t_double DOUBLE(6, 2), + t_decimal DECIMAL(38, 19), + t_char CHAR, + t_varchar VARCHAR(10), + c_binary binary(16), + c_varbinary varbinary(16), + t_tinytext TINYTEXT, + t_text TEXT, + t_mediumtext MEDIUMTEXT, + t_longtext LONGTEXT, + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, + t_year YEAR, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), + t_bit BIT(64), + t_json JSON, + PRIMARY KEY (id) +); + +CREATE TABLE multi_charset ( + id INT, + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob, + PRIMARY KEY (id) +) ENGINE = InnoDB CHARSET = utf8mb4; + +CREATE TABLE binary_columns +( + id INT AUTO_INCREMENT, + c_binary binary(255), + c_varbinary varbinary(255), + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + PRIMARY KEY (id) +); \ No newline at end of file diff --git a/tests/integration_tests/storage_cleanup/run.sh b/tests/integration_tests/storage_cleanup/run.sh new file mode 100644 index 00000000000..0e724226820 --- /dev/null +++ b/tests/integration_tests/storage_cleanup/run.sh @@ -0,0 +1,120 @@ +#!/bin/bash + +set -eux + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +EXIST_FILES=() +CLEANED_FILES=() +function generate_single_table_files() { + local workdir=$1 + local bucket=$2 + local schema=$3 + local table=$4 + local day=$5 + local file_cnt=$6 + local should_clean=$7 # true or false + + table_dir=$workdir/$bucket/$schema/$table/$day + mkdir -p $table_dir + for i in $(seq 1 $file_cnt); do + file=$table_dir/$i.data + touch $file + if [ "$should_clean" == "true" ]; then + CLEANED_FILES+=($file) + else + EXIST_FILES+=($file) + fi + done + + mkdir -p $table_dir/meta + touch $table_dir/meta/CDC.index +} + +function generate_historic_files() { + local target_bucket="storage_test" + yesterday=$(date -d "yesterday" +"%Y-%m-%d") # should not be cleaned since file-expiration-days is 1 + day_before_yesterday=$(date -d "2 days ago" +"%Y-%m-%d") # should be cleaned + + # historic files of table in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type $yesterday 10 false + generate_single_table_files $WORK_DIR $target_bucket test multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns $day_before_yesterday 10 true + + # historic files of tables in test but not in schema.sql + generate_single_table_files $WORK_DIR $target_bucket test multi_data_type_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test multi_charset_dummy $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test binary_columns_dummy $yesterday 10 false + + # historic files of table belongs to different schema + generate_single_table_files $WORK_DIR $target_bucket test2 multi_data_type $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 multi_charset $day_before_yesterday 10 true + generate_single_table_files $WORK_DIR $target_bucket test2 binary_columns $yesterday 10 false + + # historic files in default bucket, which should not be cleaned + generate_single_table_files $WORK_DIR storage_test_default test multi_data_type 2022-01-01 10 false + generate_single_table_files $WORK_DIR storage_test_default test multi_charset 2022-01-02 10 false + generate_single_table_files $WORK_DIR storage_test_default test binary_columns 2022-01-03 10 false +} + +function check_file_exists() { + local all_should_exist=$1 + for f in ${EXIST_FILES[@]}; do + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + done + + for f in ${CLEANED_FILES[@]}; do + if [ "$all_should_exist" == "true" ]; then + if [ ! -f $f ]; then + echo "file $f should exist but not" + exit 1 + fi + else + if [ -f $f ]; then + echo "file $f should not exist but exists" + exit 1 + fi + fi + done +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + generate_historic_files + + SINK_URI_DEFAULT="file://$WORK_DIR/storage_test_default?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI_DEFAULT" -c "default-config-test" --config=$CUR/conf/changefeed-default.toml + sleep 20 + check_file_exists true + + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + sleep 20 + check_file_exists false + + run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"