Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Nov 21, 2023
1 parent 77816a0 commit 66de8ee
Show file tree
Hide file tree
Showing 6 changed files with 1 addition and 244 deletions.
79 changes: 0 additions & 79 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,85 +326,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.ChangefeedErrorStuckDuration == 0 {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
<<<<<<< HEAD
=======
if info.Config.SQLMode == "" {
info.Config.SQLMode = defaultConfig.SQLMode
}
info.RmUnusedFields()
}

// RmUnusedFields removes unnecessary fields based on the downstream type and
// the protocol. Since we utilize a common changefeed configuration template,
// certain fields may not be utilized for certain protocols.
func (info *ChangeFeedInfo) RmUnusedFields() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn(
"failed to parse the sink uri",
zap.Error(err),
zap.Any("sinkUri", info.SinkURI),
)
return
}
// blackhole is for testing purpose, no need to remove fields
if sink.IsBlackHoleScheme(uri.Scheme) {
return
}
if !sink.IsMQScheme(uri.Scheme) {
info.rmMQOnlyFields()
} else {
// remove schema registry for MQ downstream with
// protocol other than avro
if util.GetOrZero(info.Config.Sink.Protocol) != config.ProtocolAvro.String() {
info.Config.Sink.SchemaRegistry = nil
}
}

if !sink.IsStorageScheme(uri.Scheme) {
info.rmStorageOnlyFields()
}

if !sink.IsMySQLCompatibleScheme(uri.Scheme) {
info.rmDBOnlyFields()
} else {
// remove fields only being used by MQ and Storage downstream
info.Config.Sink.Protocol = nil
info.Config.Sink.Terminator = nil
}
}

func (info *ChangeFeedInfo) rmMQOnlyFields() {
log.Info("since the downstream is not a MQ, remove MQ only fields",
zap.String("namespace", info.Namespace),
zap.String("changefeed", info.ID))
info.Config.Sink.DispatchRules = nil
info.Config.Sink.SchemaRegistry = nil
info.Config.Sink.EncoderConcurrency = nil
info.Config.Sink.EnableKafkaSinkV2 = nil
info.Config.Sink.OnlyOutputUpdatedColumns = nil
info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil
info.Config.Sink.ContentCompatible = nil
info.Config.Sink.KafkaConfig = nil
}

func (info *ChangeFeedInfo) rmStorageOnlyFields() {
info.Config.Sink.CSVConfig = nil
info.Config.Sink.DateSeparator = nil
info.Config.Sink.EnablePartitionSeparator = nil
info.Config.Sink.FileIndexWidth = nil
info.Config.Sink.CloudStorageConfig = nil
}

func (info *ChangeFeedInfo) rmDBOnlyFields() {
info.Config.EnableSyncPoint = nil
info.Config.BDRMode = nil
info.Config.SyncPointInterval = nil
info.Config.SyncPointRetention = nil
info.Config.Consistent = nil
info.Config.Sink.SafeMode = nil
info.Config.Sink.MySQLConfig = nil
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down
21 changes: 1 addition & 20 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,15 @@ func TestVerifyAndComplete(t *testing.T) {
t.Parallel()

info := &ChangeFeedInfo{
SinkURI: "mysql://",
SinkURI: "blackhole://",
StartTs: 417257993615179777,
Config: &config.ReplicaConfig{
<<<<<<< HEAD
MemoryQuota: 1073741824,
CaseSensitive: false,
EnableOldValue: true,
CheckGCSafePoint: true,
SyncPointInterval: time.Minute * 10,
SyncPointRetention: time.Hour * 24,
=======
MemoryQuota: 1073741824,
CaseSensitive: false,
CheckGCSafePoint: true,
EnableSyncPoint: util.AddressOf(false),
SyncPointInterval: util.AddressOf(time.Minute * 10),
SyncPointRetention: util.AddressOf(time.Hour * 24),
BDRMode: util.AddressOf(false),
IgnoreIneligibleTable: false,
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
},
}

Expand All @@ -171,14 +160,6 @@ func TestVerifyAndComplete(t *testing.T) {
marshalConfig1, err := info.Config.Marshal()
require.Nil(t, err)
defaultConfig := config.GetDefaultReplicaConfig()
<<<<<<< HEAD
=======
info2 := &ChangeFeedInfo{
SinkURI: "mysql://",
Config: defaultConfig,
}
info2.RmUnusedFields()
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
marshalConfig2, err := defaultConfig.Marshal()
require.Nil(t, err)
require.Equal(t, marshalConfig2, marshalConfig1)
Expand Down
50 changes: 0 additions & 50 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,26 +125,10 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
<<<<<<< HEAD
Protocol: "open-protocol",
AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec,
},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
=======
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig,
EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency,
DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator,
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Integrity: config.GetDefaultReplicaConfig().Integrity,
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
},
Expand Down Expand Up @@ -194,27 +178,10 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
<<<<<<< HEAD
Protocol: "open-protocol",
AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec,
},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
=======
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig,
EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency,
DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator,
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Consistent: config.GetDefaultReplicaConfig().Consistent,
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
},
Expand Down Expand Up @@ -269,27 +236,10 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
<<<<<<< HEAD
Protocol: "open-protocol",
AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec,
},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
=======
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency,
CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig,
DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator,
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
},
Expand Down
18 changes: 0 additions & 18 deletions pkg/sink/sink_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,3 @@ func IsStorageScheme(scheme string) bool {
return scheme == FileScheme || scheme == S3Scheme || scheme == GCSScheme ||
scheme == GSScheme || scheme == AzblobScheme || scheme == AzureScheme || scheme == CloudStorageNoopScheme
}
<<<<<<< HEAD
=======

// IsPulsarScheme returns true if the scheme belong to pulsar scheme.
func IsPulsarScheme(scheme string) bool {
return scheme == PulsarScheme || scheme == PulsarSSLScheme
}

// IsBlackHoleScheme returns true if the scheme belong to blackhole scheme.
func IsBlackHoleScheme(scheme string) bool {
return scheme == BlackHoleScheme
}

// GetScheme returns the scheme of the url.
func GetScheme(url *url.URL) string {
return strings.ToLower(url.Scheme)
}
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
66 changes: 0 additions & 66 deletions tests/integration_tests/api_v2/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ import (
"time"

"github.com/pingcap/log"
<<<<<<< HEAD
"github.com/pingcap/tiflow/pkg/redo"
=======
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
"go.uber.org/zap"
)

Expand All @@ -38,17 +33,10 @@ var customReplicaConfig = &ReplicaConfig{
ForceReplicate: false,
IgnoreIneligibleTable: false,
CheckGCSafePoint: false,
<<<<<<< HEAD
EnableSyncPoint: false,
BDRMode: false,
SyncPointInterval: &JSONDuration{11 * time.Minute},
SyncPointRetention: &JSONDuration{25 * time.Hour},
=======
BDRMode: util.AddressOf(false),
EnableSyncPoint: util.AddressOf(false),
SyncPointInterval: util.AddressOf(JSONDuration{duration: 10 * time.Minute}),
SyncPointRetention: util.AddressOf(JSONDuration{duration: 24 * time.Hour}),
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
Filter: &FilterConfig{
MySQLReplicationRules: &MySQLReplicationRules{
DoTables: []*Table{{"a", "b"}, {"c", "d"}},
Expand Down Expand Up @@ -95,25 +83,11 @@ var customReplicaConfig = &ReplicaConfig{
[]string{"c"},
},
},
<<<<<<< HEAD
TxnAtomicity: "table",
EncoderConcurrency: 20,
Terminator: "a",
DateSeparator: "month",
EnablePartitionSeparator: true,
=======
TxnAtomicity: "table",
Terminator: "a",
CSVConfig: &CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
},
DateSeparator: "day",
EncoderConcurrency: util.AddressOf(32),
EnablePartitionSeparator: util.AddressOf(true),
ContentCompatible: util.AddressOf(true),
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
},
Consistent: &ConsistentConfig{
Level: "",
Expand All @@ -122,35 +96,17 @@ var customReplicaConfig = &ReplicaConfig{
Storage: "local://test",
UseFileBackend: true,
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 2000,
MetaFlushIntervalInMs: 200,
Storage: "",
UseFileBackend: false,
EncoderWorkerNum: 31,
FlushWorkerNum: 18,
},
}

// defaultReplicaConfig check if the default values is changed
var defaultReplicaConfig = &ReplicaConfig{
MemoryQuota: 1024 * 1024 * 1024,
CaseSensitive: false,
<<<<<<< HEAD
EnableOldValue: true,
CheckGCSafePoint: true,
EnableSyncPoint: false,
SyncPointInterval: &JSONDuration{time.Minute * 10},
SyncPointRetention: &JSONDuration{time.Hour * 24},
=======
CheckGCSafePoint: true,
EnableSyncPoint: util.AddressOf(false),
SyncPointInterval: util.AddressOf(JSONDuration{duration: 10 * time.Minute}),
SyncPointRetention: util.AddressOf(JSONDuration{duration: 24 * time.Hour}),
BDRMode: util.AddressOf(false),
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
Filter: &FilterConfig{
Rules: []string{"*.*"},
},
Expand All @@ -159,7 +115,6 @@ var defaultReplicaConfig = &ReplicaConfig{
},
Sink: &SinkConfig{
CSVConfig: &CSVConfig{
<<<<<<< HEAD
Quote: string("\""),
Delimiter: ",",
NullString: "\\N",
Expand All @@ -168,17 +123,6 @@ var defaultReplicaConfig = &ReplicaConfig{
Terminator: "\r\n",
DateSeparator: "day",
EnablePartitionSeparator: true,
=======
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
},
Terminator: "\r\n",
DateSeparator: "day",
EncoderConcurrency: util.AddressOf(32),
EnablePartitionSeparator: util.AddressOf(true),
ContentCompatible: util.AddressOf(false),
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
},
Consistent: &ConsistentConfig{
Level: "none",
Expand All @@ -188,16 +132,6 @@ var defaultReplicaConfig = &ReplicaConfig{
Storage: "",
UseFileBackend: false,
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 2000,
MetaFlushIntervalInMs: 200,
EncoderWorkerNum: 16,
FlushWorkerNum: 8,
Storage: "",
UseFileBackend: false,
},
}

func testStatus(ctx context.Context, client *CDCRESTClient) error {
Expand Down
11 changes: 0 additions & 11 deletions tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,8 @@ type SinkConfig struct {
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
<<<<<<< HEAD
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
=======
DateSeparator string `json:"date_separator,omitempty"`
EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty"`
ContentCompatible *bool `json:"content_compatible"`
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
}

// CSVConfig denotes the csv config
Expand Down Expand Up @@ -281,11 +275,6 @@ type ConsistentConfig struct {
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
<<<<<<< HEAD
=======
EncoderWorkerNum int `json:"encoding_worker_num"`
FlushWorkerNum int `json:"flush_worker_num"`
>>>>>>> 68dc49cba0 (redo(ticdc): fix redo balckhole storage issues (#10023))
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
}
Expand Down

0 comments on commit 66de8ee

Please sign in to comment.