diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index fbf9425953b..c9d62df1980 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -353,6 +353,10 @@ func (info *ChangeFeedInfo) RmUnusedFields() { ) return } + // blackhole is for testing purpose, no need to remove fields + if sink.IsBlackHoleScheme(uri.Scheme) { + return + } if !sink.IsMQScheme(uri.Scheme) { info.rmMQOnlyFields() } else { diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 16dcbd5b914..3e699852bfa 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -243,14 +243,17 @@ func TestVerifyAndComplete(t *testing.T) { t.Parallel() info := &ChangeFeedInfo{ - SinkURI: "blackhole://", + SinkURI: "mysql://", StartTs: 417257993615179777, Config: &config.ReplicaConfig{ - MemoryQuota: 1073741824, - CaseSensitive: false, - CheckGCSafePoint: true, - SyncPointInterval: util.AddressOf(time.Minute * 10), - SyncPointRetention: util.AddressOf(time.Hour * 24), + MemoryQuota: 1073741824, + CaseSensitive: false, + CheckGCSafePoint: true, + EnableSyncPoint: util.AddressOf(false), + SyncPointInterval: util.AddressOf(time.Minute * 10), + SyncPointRetention: util.AddressOf(time.Hour * 24), + BDRMode: util.AddressOf(false), + IgnoreIneligibleTable: false, }, } @@ -261,7 +264,7 @@ func TestVerifyAndComplete(t *testing.T) { require.Nil(t, err) defaultConfig := config.GetDefaultReplicaConfig() info2 := &ChangeFeedInfo{ - SinkURI: "blackhole://", + SinkURI: "mysql://", Config: defaultConfig, } info2.RmUnusedFields() diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 5fc437c659d..3db0d083d86 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -130,6 +130,10 @@ func (m *metaManager) preStart(ctx context.Context) error { } // "nfs" and "local" scheme are converted to "file" scheme redo.FixLocalScheme(uri) + // blackhole scheme is converted to "noop" scheme here, so we can use blackhole for testing + if redo.IsBlackholeStorage(uri.Scheme) { + uri, _ = storage.ParseRawURL("noop://") + } extStorage, err := redo.InitExternalStorage(ctx, *uri) if err != nil { diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 61dc26f4b7e..7296da25fae 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -117,10 +117,18 @@ func TestChangefeedStateUpdate(t *testing.T) { Mounter: &config.MounterConfig{WorkerNum: 16}, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), - AdvanceTimeoutInSec: putil.AddressOf(uint(150)), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(150)), + CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig, + EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency, + DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator, + EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator, + EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns, + DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns, }, - Integrity: config.GetDefaultReplicaConfig().Integrity, + Consistent: config.GetDefaultReplicaConfig().Consistent, + Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, SQLMode: config.GetDefaultReplicaConfig().SQLMode, @@ -171,11 +179,19 @@ func TestChangefeedStateUpdate(t *testing.T) { Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), - AdvanceTimeoutInSec: putil.AddressOf(uint(150)), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(150)), + CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig, + EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency, + DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator, + EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator, + EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns, + DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns, }, - Scheduler: config.GetDefaultReplicaConfig().Scheduler, - Integrity: config.GetDefaultReplicaConfig().Integrity, + Scheduler: config.GetDefaultReplicaConfig().Scheduler, + Integrity: config.GetDefaultReplicaConfig().Integrity, + Consistent: config.GetDefaultReplicaConfig().Consistent, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, SQLMode: config.GetDefaultReplicaConfig().SQLMode, @@ -231,11 +247,19 @@ func TestChangefeedStateUpdate(t *testing.T) { Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), - AdvanceTimeoutInSec: putil.AddressOf(uint(150)), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(150)), + EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency, + CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig, + DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator, + EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator, + EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns, + DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns, }, - Scheduler: config.GetDefaultReplicaConfig().Scheduler, - Integrity: config.GetDefaultReplicaConfig().Integrity, + Consistent: config.GetDefaultReplicaConfig().Consistent, + Scheduler: config.GetDefaultReplicaConfig().Scheduler, + Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, SQLMode: config.GetDefaultReplicaConfig().SQLMode, diff --git a/pkg/sink/sink_type.go b/pkg/sink/sink_type.go index 382d8437689..5ff07c7d1f0 100644 --- a/pkg/sink/sink_type.go +++ b/pkg/sink/sink_type.go @@ -97,6 +97,11 @@ func IsPulsarScheme(scheme string) bool { return scheme == PulsarScheme || scheme == PulsarSSLScheme } +// IsBlackHoleScheme returns true if the scheme belong to blackhole scheme. +func IsBlackHoleScheme(scheme string) bool { + return scheme == BlackHoleScheme +} + // GetScheme returns the scheme of the url. func GetScheme(url *url.URL) string { return strings.ToLower(url.Scheme) diff --git a/tests/integration_tests/api_v2/cases.go b/tests/integration_tests/api_v2/cases.go index 9e336a016af..d5985097dbf 100644 --- a/tests/integration_tests/api_v2/cases.go +++ b/tests/integration_tests/api_v2/cases.go @@ -21,6 +21,8 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -31,6 +33,10 @@ var customReplicaConfig = &ReplicaConfig{ ForceReplicate: false, IgnoreIneligibleTable: false, CheckGCSafePoint: false, + BDRMode: util.AddressOf(false), + EnableSyncPoint: util.AddressOf(false), + SyncPointInterval: util.AddressOf(JSONDuration{duration: 10 * time.Minute}), + SyncPointRetention: util.AddressOf(JSONDuration{duration: 24 * time.Hour}), Filter: &FilterConfig{ MySQLReplicationRules: &MySQLReplicationRules{ DoTables: []*Table{{"a", "b"}, {"c", "d"}}, @@ -65,6 +71,14 @@ var customReplicaConfig = &ReplicaConfig{ }, TxnAtomicity: "table", Terminator: "a", + CSVConfig: &CSVConfig{ + Quote: string(config.DoubleQuoteChar), + Delimiter: config.Comma, + NullString: config.NULL, + }, + DateSeparator: "day", + EncoderConcurrency: util.AddressOf(32), + EnablePartitionSeparator: util.AddressOf(true), }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, @@ -74,13 +88,27 @@ var customReplicaConfig = &ReplicaConfig{ IntegrityCheckLevel: "none", CorruptionHandleLevel: "warn", }, + Consistent: &ConsistentConfig{ + Level: "none", + MaxLogSize: 64, + FlushIntervalInMs: 2000, + MetaFlushIntervalInMs: 200, + Storage: "", + UseFileBackend: false, + EncoderWorkerNum: 31, + FlushWorkerNum: 18, + }, } // defaultReplicaConfig check if the default values is changed var defaultReplicaConfig = &ReplicaConfig{ - MemoryQuota: 1024 * 1024 * 1024, - CaseSensitive: false, - CheckGCSafePoint: true, + MemoryQuota: 1024 * 1024 * 1024, + CaseSensitive: false, + CheckGCSafePoint: true, + EnableSyncPoint: util.AddressOf(false), + SyncPointInterval: util.AddressOf(JSONDuration{duration: 10 * time.Minute}), + SyncPointRetention: util.AddressOf(JSONDuration{duration: 24 * time.Hour}), + BDRMode: util.AddressOf(false), Filter: &FilterConfig{ Rules: []string{"*.*"}, }, @@ -88,7 +116,15 @@ var defaultReplicaConfig = &ReplicaConfig{ WorkerNum: 16, }, Sink: &SinkConfig{ - Terminator: "\r\n", + CSVConfig: &CSVConfig{ + Quote: string(config.DoubleQuoteChar), + Delimiter: config.Comma, + NullString: config.NULL, + }, + Terminator: "\r\n", + DateSeparator: "day", + EncoderConcurrency: util.AddressOf(32), + EnablePartitionSeparator: util.AddressOf(true), }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, @@ -98,6 +134,16 @@ var defaultReplicaConfig = &ReplicaConfig{ IntegrityCheckLevel: "none", CorruptionHandleLevel: "warn", }, + Consistent: &ConsistentConfig{ + Level: "none", + MaxLogSize: 64, + FlushIntervalInMs: 2000, + MetaFlushIntervalInMs: 200, + EncoderWorkerNum: 16, + FlushWorkerNum: 8, + Storage: "", + UseFileBackend: false, + }, } func testStatus(ctx context.Context, client *CDCRESTClient) error { @@ -143,7 +189,9 @@ func testChangefeed(ctx context.Context, client *CDCRESTClient) error { log.Panic("failed to unmarshal response", zap.String("body", string(resp.body)), zap.Error(err)) } if !reflect.DeepEqual(cfInfo.Config, defaultReplicaConfig) { - log.Panic("config is not equals", zap.Any("add", defaultReplicaConfig), zap.Any("get", cfInfo.Config)) + log.Panic("config is not equals", + zap.Any("add", defaultReplicaConfig), + zap.Any("get", cfInfo.Config)) } // pause changefeed @@ -191,7 +239,9 @@ func testChangefeed(ctx context.Context, client *CDCRESTClient) error { log.Panic("unmarshal failed", zap.String("body", string(resp.body)), zap.Error(err)) } if !reflect.DeepEqual(cf.Config, customReplicaConfig) { - log.Panic("config is not equals", zap.Any("update", customReplicaConfig), zap.Any("get", cf.Config)) + log.Panic("config is not equals", + zap.Any("update", customReplicaConfig), + zap.Any("get", cf.Config)) } // list changefeed diff --git a/tests/integration_tests/api_v2/model.go b/tests/integration_tests/api_v2/model.go index 3c2006f5c14..99a153f32ec 100644 --- a/tests/integration_tests/api_v2/model.go +++ b/tests/integration_tests/api_v2/model.go @@ -276,7 +276,7 @@ type ConsistentConfig struct { MaxLogSize int64 `json:"max_log_size"` FlushIntervalInMs int64 `json:"flush_interval"` MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` - EncoderWorkerNum int `json:"encoder_worker_num"` + EncoderWorkerNum int `json:"encoding_worker_num"` FlushWorkerNum int `json:"flush_worker_num"` Storage string `json:"storage"` UseFileBackend bool `json:"use_file_backend"`