Skip to content

Commit

Permalink
adjust.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 30, 2023
1 parent c7ee81e commit df4162d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 46 deletions.
8 changes: 7 additions & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/column_selector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -104,6 +105,11 @@ func NewPulsarDMLSink(
return nil, errors.Trace(err)
}

trans, err := column_selector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig,
config.DefaultMaxMessageBytes)
if err != nil {
Expand All @@ -119,7 +125,7 @@ func NewPulsarDMLSink(
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager,
eventRouter, encoderGroup, protocol, scheme, errCh)
eventRouter, trans, encoderGroup, protocol, scheme, errCh)

return s, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ func (s *selector) Apply(event *model.RowChangedEvent) error {
return nil
}

for _, column := range event.Columns {
if !s.columnM.MatchColumn(column.Name) {
column.Value = nil
for i := 0; i < len(event.Columns); i++ {
if !s.columnM.MatchColumn(event.Columns[i].Name) {
event.Columns[i] = nil
}
}

for _, column := range event.PreColumns {
if !s.columnM.MatchColumn(column.Name) {
column.Value = nil
for i := 0; i < len(event.PreColumns); i++ {
if !s.columnM.MatchColumn(event.PreColumns[i].Name) {
event.PreColumns[i] = nil
}
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,40 @@ import (
"github.com/stretchr/testify/require"
)

var (
event = &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "table1",
var event = &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "table1",
},
Columns: []*model.Column{
{
Name: "col1",
Value: []byte("val1"),
},
Columns: []*model.Column{
{
Name: "col1",
Value: []byte("val1"),
},
{
Name: "col2",
Value: []byte("val2"),
},
{
Name: "col3",
Value: []byte("val3"),
},
{
Name: "col2",
Value: []byte("val2"),
},
PreColumns: []*model.Column{
{
Name: "col1",
Value: []byte("val1"),
},
{
Name: "col2",
Value: []byte("val2"),
},
{
Name: "col3",
Value: []byte("val3"),
},
{
Name: "col3",
Value: []byte("val3"),
},
}
)
},
PreColumns: []*model.Column{
{
Name: "col1",
Value: []byte("val1"),
},
{
Name: "col2",
Value: []byte("val2"),
},
{
Name: "col3",
Value: []byte("val3"),
},
},
}

func TestNewColumnSelectorNoRules(t *testing.T) {
// the column selector is not set
Expand Down Expand Up @@ -105,11 +103,11 @@ func TestNewColumnSelector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte("val1"), event.Columns[0].Value)
require.Equal(t, []byte("val2"), event.Columns[1].Value)
require.Nil(t, event.Columns[2].Value)
require.Nil(t, event.Columns[2])

require.Equal(t, []byte("val1"), event.PreColumns[0].Value)
require.Equal(t, []byte("val2"), event.PreColumns[1].Value)
require.Nil(t, event.PreColumns[2].Value)
require.Nil(t, event.PreColumns[2])

event = &model.RowChangedEvent{
Table: &model.TableName{
Expand All @@ -134,7 +132,7 @@ func TestNewColumnSelector(t *testing.T) {
// the first column `a` is filter out, set to nil.
err = selectors.Apply(event)
require.NoError(t, err)
require.Nil(t, event.Columns[0].Value)
require.Nil(t, event.Columns[0])
require.Equal(t, []byte("b"), event.Columns[1].Value)
require.Equal(t, []byte("c"), event.Columns[2].Value)

Expand Down Expand Up @@ -166,7 +164,7 @@ func TestNewColumnSelector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte("col"), event.Columns[0].Value)
require.Equal(t, []byte("col1"), event.Columns[1].Value)
require.Nil(t, event.Columns[2].Value)
require.Nil(t, event.Columns[2])
require.Equal(t, []byte("col3"), event.Columns[3].Value)

event = &model.RowChangedEvent{
Expand Down Expand Up @@ -195,8 +193,8 @@ func TestNewColumnSelector(t *testing.T) {
}
err = selectors.Apply(event)
require.NoError(t, err)
require.Nil(t, event.Columns[0].Value)
require.Nil(t, event.Columns[0])
require.Equal(t, []byte("col1"), event.Columns[1].Value)
require.Nil(t, event.Columns[2].Value)
require.Nil(t, event.Columns[2])
require.Equal(t, []byte("coA1"), event.Columns[3].Value)
}

0 comments on commit df4162d

Please sign in to comment.