Skip to content

Commit

Permalink
kafka(ticdc): make column selector can work normally to filter out co…
Browse files Browse the repository at this point in the history
…lumns (#9920)

close #9967
  • Loading branch information
3AceShowHand authored Nov 8, 2023
1 parent 2c92d70 commit 055badf
Show file tree
Hide file tree
Showing 28 changed files with 1,058 additions and 22 deletions.
11 changes: 10 additions & 1 deletion cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/validator"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -518,11 +519,19 @@ func (h APIV2HelpersImpl) getVerifiedTables(
if err != nil {
return nil, nil, err
}

err = eventRouter.VerifyTables(tableInfos)
if err != nil {
return nil, nil, err
}

selectors, err := columnselector.New(replicaConfig)
if err != nil {
return nil, nil, err
}
err = selectors.VerifyTables(tableInfos, eventRouter)
if err != nil {
return nil, nil, err
}

return ineligibleTables, eligibleTables, nil
}
4 changes: 2 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
return names, offset, true
}

// ColumnsByNames returns the column offsets of the corresponding columns by names
// OffsetsByNames returns the column offsets of the corresponding columns by names
// If any column does not exist, return false
func (ti *TableInfo) ColumnsByNames(names []string) ([]int, bool) {
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) {
// todo: optimize it
columnOffsets := make(map[string]int, len(ti.Columns))
for _, col := range ti.Columns {
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,17 @@ func TestColumnsByNames(t *testing.T) {
}

names := []string{"col1", "col2", "col3"}
offsets, ok := tableInfo.ColumnsByNames(names)
offsets, ok := tableInfo.OffsetsByNames(names)
require.True(t, ok)
require.Equal(t, []int{0, 1, 2}, offsets)

names = []string{"col2"}
offsets, ok = tableInfo.ColumnsByNames(names)
offsets, ok = tableInfo.OffsetsByNames(names)
require.True(t, ok)
require.Equal(t, []int{1}, offsets)

names = []string{"col1", "col-not-found"}
offsets, ok = tableInfo.ColumnsByNames(names)
offsets, ok = tableInfo.OffsetsByNames(names)
require.False(t, ok)
require.Nil(t, offsets)
}
15 changes: 8 additions & 7 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,14 @@ func (s *EventRouter) GetPartitionForRowChange(
row *model.RowChangedEvent,
partitionNum int32,
) (int32, string, error) {
_, partitionDispatcher := s.matchDispatcher(
row.Table.Schema, row.Table.Table,
)
return s.GetPartitionDispatcher(row.Table.Schema, row.Table.Table).
DispatchRowChangedEvent(row, partitionNum)
}

return partitionDispatcher.DispatchRowChangedEvent(
row, partitionNum,
)
// GetPartitionDispatcher returns the partition dispatcher for a specific table.
func (s *EventRouter) GetPartitionDispatcher(schema, table string) partition.Dispatcher {
_, partitionDispatcher := s.matchDispatcher(schema, table)
return partitionDispatcher
}

// VerifyTables return error if any one table route rule is invalid.
Expand All @@ -147,7 +148,7 @@ func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error {
"index is not unique when verify the table, table: %v, index: %s", table.TableName, v.IndexName)
}
case *partition.ColumnsDispatcher:
_, ok := table.ColumnsByNames(v.Columns)
_, ok := table.OffsetsByNames(v.Columns)
if !ok {
return cerror.ErrDispatcherFailed.GenWithStack(
"columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns)
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/partition/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
dispatchCols = row.PreColumns
}

offsets, ok := row.TableInfo.ColumnsByNames(r.Columns)
offsets, ok := row.TableInfo.OffsetsByNames(r.Columns)
if !ok {
log.Error("columns not found when dispatch event",
zap.Any("tableName", row.Table),
Expand All @@ -65,7 +65,11 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
}

for idx := 0; idx < len(r.Columns); idx++ {
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
col := dispatchCols[offsets[idx]]
if col == nil {
continue
}
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(col.Value)))
}

sum32 := r.hasher.Sum32()
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven
"index not found when dispatch event, table: %v, index: %s", row.Table, r.IndexName)
}
for idx := 0; idx < len(names); idx++ {
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
col := dispatchCols[offsets[idx]]
if col == nil {
continue
}
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(col.Value)))
}
}

Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -97,6 +98,11 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

trans, err := columnselector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -118,7 +124,7 @@ func NewKafkaDMLSink(
concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, encoderGroup, protocol, scheme, errCh)
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID))
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -48,6 +49,8 @@ type dmlSink struct {

alive struct {
sync.RWMutex

transformer transformer.Transformer
// eventRouter used to route events to the right topic and partition.
eventRouter *dispatcher.EventRouter
// topicManager used to manage topics.
Expand Down Expand Up @@ -77,6 +80,7 @@ func newDMLSink(
adminClient kafka.ClusterAdminClient,
topicManager manager.TopicManager,
eventRouter *dispatcher.EventRouter,
transformer transformer.Transformer,
encoderGroup codec.EncoderGroup,
protocol config.Protocol,
scheme string,
Expand All @@ -95,6 +99,7 @@ func newDMLSink(
dead: make(chan struct{}),
scheme: scheme,
}
s.alive.transformer = transformer
s.alive.eventRouter = eventRouter
s.alive.topicManager = topicManager
s.alive.worker = worker
Expand Down Expand Up @@ -170,6 +175,13 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
s.cancel(err)
return errors.Trace(err)
}

err = s.alive.transformer.Apply(row)
if err != nil {
s.cancel(err)
return errors.Trace(err)
}

index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
if err != nil {
s.cancel(err)
Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -104,6 +105,11 @@ func NewPulsarDMLSink(
return nil, errors.Trace(err)
}

trans, err := columnselector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig,
config.DefaultMaxMessageBytes)
if err != nil {
Expand All @@ -119,7 +125,7 @@ func NewPulsarDMLSink(
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager,
eventRouter, encoderGroup, protocol, scheme, errCh)
eventRouter, trans, encoderGroup, protocol, scheme, errCh)

return s, nil
}
Loading

0 comments on commit 055badf

Please sign in to comment.