diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index 93b40f9ac8b..16c0f30c9cd 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/column_selector" "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -104,6 +105,11 @@ func NewPulsarDMLSink( return nil, errors.Trace(err) } + trans, err := column_selector.New(replicaConfig) + if err != nil { + return nil, errors.Trace(err) + } + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, config.DefaultMaxMessageBytes) if err != nil { @@ -119,7 +125,7 @@ func NewPulsarDMLSink( encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) s := newDMLSink(ctx, changefeedID, p, nil, topicManager, - eventRouter, encoderGroup, protocol, scheme, errCh) + eventRouter, trans, encoderGroup, protocol, scheme, errCh) return s, nil } diff --git a/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector.go b/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector.go index 68b2594c380..cdf51b81b5f 100644 --- a/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector.go +++ b/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector.go @@ -58,17 +58,18 @@ func (s *selector) Apply(event *model.RowChangedEvent) error { return nil } - for _, column := range event.Columns { - if !s.columnM.MatchColumn(column.Name) { - column.Value = nil + for i := 0; i < len(event.Columns); i++ { + if !s.columnM.MatchColumn(event.Columns[i].Name) { + event.Columns[i] = nil } } - for _, column := range event.PreColumns { - if !s.columnM.MatchColumn(column.Name) { - column.Value = nil + for i := 0; i < len(event.PreColumns); i++ { + if !s.columnM.MatchColumn(event.PreColumns[i].Name) { + event.PreColumns[i] = nil } } + return nil } diff --git a/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector_test.go b/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector_test.go index 19d6e6fcda5..f8be4060846 100644 --- a/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector_test.go +++ b/cdc/sink/dmlsink/mq/transformer/column_selector/column_selector_test.go @@ -21,42 +21,40 @@ import ( "github.com/stretchr/testify/require" ) -var ( - event = &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "table1", +var event = &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "table1", + }, + Columns: []*model.Column{ + { + Name: "col1", + Value: []byte("val1"), }, - Columns: []*model.Column{ - { - Name: "col1", - Value: []byte("val1"), - }, - { - Name: "col2", - Value: []byte("val2"), - }, - { - Name: "col3", - Value: []byte("val3"), - }, + { + Name: "col2", + Value: []byte("val2"), }, - PreColumns: []*model.Column{ - { - Name: "col1", - Value: []byte("val1"), - }, - { - Name: "col2", - Value: []byte("val2"), - }, - { - Name: "col3", - Value: []byte("val3"), - }, + { + Name: "col3", + Value: []byte("val3"), }, - } -) + }, + PreColumns: []*model.Column{ + { + Name: "col1", + Value: []byte("val1"), + }, + { + Name: "col2", + Value: []byte("val2"), + }, + { + Name: "col3", + Value: []byte("val3"), + }, + }, +} func TestNewColumnSelectorNoRules(t *testing.T) { // the column selector is not set @@ -105,11 +103,11 @@ func TestNewColumnSelector(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("val1"), event.Columns[0].Value) require.Equal(t, []byte("val2"), event.Columns[1].Value) - require.Nil(t, event.Columns[2].Value) + require.Nil(t, event.Columns[2]) require.Equal(t, []byte("val1"), event.PreColumns[0].Value) require.Equal(t, []byte("val2"), event.PreColumns[1].Value) - require.Nil(t, event.PreColumns[2].Value) + require.Nil(t, event.PreColumns[2]) event = &model.RowChangedEvent{ Table: &model.TableName{ @@ -134,7 +132,7 @@ func TestNewColumnSelector(t *testing.T) { // the first column `a` is filter out, set to nil. err = selectors.Apply(event) require.NoError(t, err) - require.Nil(t, event.Columns[0].Value) + require.Nil(t, event.Columns[0]) require.Equal(t, []byte("b"), event.Columns[1].Value) require.Equal(t, []byte("c"), event.Columns[2].Value) @@ -166,7 +164,7 @@ func TestNewColumnSelector(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("col"), event.Columns[0].Value) require.Equal(t, []byte("col1"), event.Columns[1].Value) - require.Nil(t, event.Columns[2].Value) + require.Nil(t, event.Columns[2]) require.Equal(t, []byte("col3"), event.Columns[3].Value) event = &model.RowChangedEvent{ @@ -195,8 +193,8 @@ func TestNewColumnSelector(t *testing.T) { } err = selectors.Apply(event) require.NoError(t, err) - require.Nil(t, event.Columns[0].Value) + require.Nil(t, event.Columns[0]) require.Equal(t, []byte("col1"), event.Columns[1].Value) - require.Nil(t, event.Columns[2].Value) + require.Nil(t, event.Columns[2]) require.Equal(t, []byte("coA1"), event.Columns[3].Value) }