From 21e60430a05ff5a6f4552e4c971521031ac62466 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 27 Oct 2023 16:05:32 +0800 Subject: [PATCH] add more unit test. --- cdc/sink/dmlsink/mq/kafka_dml_sink.go | 3 +- .../dmlsink/mq/transformer/column_selector.go | 17 +--- .../mq/transformer/column_selector_test.go | 99 +++++++++++++++++++ 3 files changed, 104 insertions(+), 15 deletions(-) diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index ba029b95a26..21e2ab9ba6b 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -98,8 +98,7 @@ func NewKafkaDMLSink( return nil, errors.Trace(err) } - // todo: what if the `replicaConfig.Sink` is nil. - trans, err := transformer.NewColumnSelector(replicaConfig.Sink.ColumnSelectors) + trans, err := transformer.NewColumnSelector(replicaConfig) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/dmlsink/mq/transformer/column_selector.go b/cdc/sink/dmlsink/mq/transformer/column_selector.go index 6287581075c..286a4a19d89 100644 --- a/cdc/sink/dmlsink/mq/transformer/column_selector.go +++ b/cdc/sink/dmlsink/mq/transformer/column_selector.go @@ -14,7 +14,6 @@ package transformer import ( - "github.com/pingcap/log" filter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -39,7 +38,6 @@ func (c ColumnSelectors) Transform(event *model.RowChangedEvent) error { return selector.Transform(event) } } - log.Panic("column selectors must cover all tables") return nil } @@ -81,13 +79,13 @@ func (s *columnSelector) Transform(event *model.RowChangedEvent) error { } for _, column := range event.Columns { - if s.columnM.Match(column.Name) { + if !s.columnM.Match(column.Name) { column.Value = nil } } for _, column := range event.PreColumns { - if s.columnM.Match(column.Name) { + if !s.columnM.Match(column.Name) { column.Value = nil } } @@ -96,15 +94,8 @@ func (s *columnSelector) Transform(event *model.RowChangedEvent) error { // NewColumnSelector return a column selector func NewColumnSelector(cfg *config.ReplicaConfig) (ColumnSelectors, error) { - // If an event does not match any column selector rules in the config file, - // it won't be transformed, all columns match the *.* rule - ruleConfig := append(cfg.Sink.ColumnSelectors, &config.ColumnSelector{ - Matcher: []string{"*.*"}, - Columns: []string{"*.*"}, - }) - - result := make(ColumnSelectors, 0, len(ruleConfig)) - for _, r := range ruleConfig { + result := make(ColumnSelectors, 0, len(cfg.Sink.ColumnSelectors)) + for _, r := range cfg.Sink.ColumnSelectors { selector, err := newColumnSelector(r, cfg.CaseSensitive) if err != nil { return nil, err diff --git a/cdc/sink/dmlsink/mq/transformer/column_selector_test.go b/cdc/sink/dmlsink/mq/transformer/column_selector_test.go index 3d808515147..479eea3ba93 100644 --- a/cdc/sink/dmlsink/mq/transformer/column_selector_test.go +++ b/cdc/sink/dmlsink/mq/transformer/column_selector_test.go @@ -12,3 +12,102 @@ // limitations under the License. package transformer + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/stretchr/testify/require" +) + +var ( + eventTable1 = &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "table1", + }, + Columns: []*model.Column{ + { + Name: "col1", + Value: []byte("val1"), + }, + { + Name: "col2", + Value: []byte("val2"), + }, + { + Name: "col3", + Value: []byte("val3"), + }, + }, + PreColumns: []*model.Column{ + { + Name: "col1", + Value: []byte("val1"), + }, + { + Name: "col2", + Value: []byte("val2"), + }, + { + Name: "col3", + Value: []byte("val3"), + }, + }, + } +) + +func TestNewColumnSelectorNoRules(t *testing.T) { + // the column selector rule is not set + replicaConfig := config.GetDefaultReplicaConfig() + selectors, err := NewColumnSelector(replicaConfig) + require.NoError(t, err) + require.NotNil(t, selectors) + require.Len(t, selectors, 0) + + err = selectors.Transform(eventTable1) + require.NoError(t, err) + for _, column := range eventTable1.Columns { + require.NotNil(t, column.Value) + } + for _, column := range eventTable1.PreColumns { + require.NotNil(t, column.Value) + } +} + +func TestNewColumnSelector(t *testing.T) { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{ + { + Matcher: []string{"test.*"}, + Columns: []string{"col1", "col2"}, + }, + { + Matcher: []string{"test1.*"}, + Columns: []string{"*", "!a"}, + }, + { + Matcher: []string{"test2.*"}, + Columns: []string{"co*", "!col1"}, + }, + { + Matcher: []string{"test3.*"}, + Columns: []string{"co?1"}, + }, + } + selectors, err := NewColumnSelector(replicaConfig) + require.NoError(t, err) + require.Len(t, selectors, 4) + + err = selectors.Transform(eventTable1) + require.NoError(t, err) + require.Equal(t, []byte("val1"), eventTable1.Columns[0].Value) + require.Equal(t, []byte("val2"), eventTable1.Columns[1].Value) + require.Nil(t, eventTable1.Columns[3].Value) + + require.Equal(t, []byte("val1"), eventTable1.PreColumns[0].Value) + require.Equal(t, []byte("val2"), eventTable1.PreColumns[1].Value) + require.Nil(t, eventTable1.PreColumns[3].Value) + +}