Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(ticdc): make column selector can work normally to filter out columns #9920

Merged
merged 32 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
347875e
add basic code for the column selector transformer.
3AceShowHand Oct 17, 2023
2b00582
add basic code for the column selector transformer.
3AceShowHand Oct 18, 2023
10420a9
adjust column selector.
3AceShowHand Oct 26, 2023
04a9b67
add column selector.
3AceShowHand Oct 27, 2023
423ec01
Merge branch 'master' into column-selector
3AceShowHand Oct 27, 2023
d7ac573
add first version column selector.
3AceShowHand Oct 27, 2023
21e6043
add more unit test.
3AceShowHand Oct 27, 2023
67919da
add more unit test.
3AceShowHand Oct 27, 2023
c7ee81e
add column selector.
3AceShowHand Oct 27, 2023
df4162d
adjust.
3AceShowHand Oct 30, 2023
9898744
add more about column selector.
3AceShowHand Oct 30, 2023
f66eda4
add check when create the changefeed.
3AceShowHand Oct 30, 2023
17b874a
tiny adjust.
3AceShowHand Oct 30, 2023
8c1d74e
add checksum check.
3AceShowHand Oct 31, 2023
982ae00
make column selector fail as unretryable error.
3AceShowHand Nov 1, 2023
c547766
add kafka column selector integration test.
3AceShowHand Nov 1, 2023
a3bd99f
remove checksum checkere binary
3AceShowHand Nov 1, 2023
a364530
fix make fmt
3AceShowHand Nov 1, 2023
aa2f0ab
fix make fmt
3AceShowHand Nov 1, 2023
a8d5a37
add integration test to run group.
3AceShowHand Nov 1, 2023
87986ce
fix unit test.
3AceShowHand Nov 1, 2023
700fe04
add more unit test.
3AceShowHand Nov 1, 2023
3f2c4e7
fix tests.
3AceShowHand Nov 2, 2023
7456bcd
fix checksum checker.
3AceShowHand Nov 2, 2023
4165ec9
skip unstable unit test.
3AceShowHand Nov 2, 2023
be58bbc
add column selector avro integration test.
3AceShowHand Nov 6, 2023
ca2f99f
fix test.
3AceShowHand Nov 6, 2023
7ec0873
fix avro integration test.
3AceShowHand Nov 6, 2023
6c50fdb
use new topic.
3AceShowHand Nov 6, 2023
b29ae33
fix avro test.
3AceShowHand Nov 7, 2023
631e622
fix avro test.
3AceShowHand Nov 7, 2023
01d056a
fix avro test.
3AceShowHand Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/validator"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -518,11 +519,19 @@ func (h APIV2HelpersImpl) getVerifiedTables(
if err != nil {
return nil, nil, err
}

err = eventRouter.VerifyTables(tableInfos)
if err != nil {
return nil, nil, err
}

selectors, err := columnselector.New(replicaConfig)
if err != nil {
return nil, nil, err
}
err = selectors.VerifyTables(tableInfos, eventRouter)
if err != nil {
return nil, nil, err
}

return ineligibleTables, eligibleTables, nil
}
4 changes: 2 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
return names, offset, true
}

// ColumnsByNames returns the column offsets of the corresponding columns by names
// OffsetsByNames returns the column offsets of the corresponding columns by names
// If any column does not exist, return false
func (ti *TableInfo) ColumnsByNames(names []string) ([]int, bool) {
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) {
// todo: optimize it
columnOffsets := make(map[string]int, len(ti.Columns))
for _, col := range ti.Columns {
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,17 @@ func TestColumnsByNames(t *testing.T) {
}

names := []string{"col1", "col2", "col3"}
offsets, ok := tableInfo.ColumnsByNames(names)
offsets, ok := tableInfo.OffsetsByNames(names)
require.True(t, ok)
require.Equal(t, []int{0, 1, 2}, offsets)

names = []string{"col2"}
offsets, ok = tableInfo.ColumnsByNames(names)
offsets, ok = tableInfo.OffsetsByNames(names)
require.True(t, ok)
require.Equal(t, []int{1}, offsets)

names = []string{"col1", "col-not-found"}
offsets, ok = tableInfo.ColumnsByNames(names)
offsets, ok = tableInfo.OffsetsByNames(names)
require.False(t, ok)
require.Nil(t, offsets)
}
15 changes: 8 additions & 7 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,14 @@ func (s *EventRouter) GetPartitionForRowChange(
row *model.RowChangedEvent,
partitionNum int32,
) (int32, string, error) {
_, partitionDispatcher := s.matchDispatcher(
row.Table.Schema, row.Table.Table,
)
return s.GetPartitionDispatcher(row.Table.Schema, row.Table.Table).
DispatchRowChangedEvent(row, partitionNum)
}

return partitionDispatcher.DispatchRowChangedEvent(
row, partitionNum,
)
// GetPartitionDispatcher returns the partition dispatcher for a specific table.
func (s *EventRouter) GetPartitionDispatcher(schema, table string) partition.Dispatcher {
_, partitionDispatcher := s.matchDispatcher(schema, table)
return partitionDispatcher
}

// VerifyTables return error if any one table route rule is invalid.
Expand All @@ -147,7 +148,7 @@ func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error {
"index is not unique when verify the table, table: %v, index: %s", table.TableName, v.IndexName)
}
case *partition.ColumnsDispatcher:
_, ok := table.ColumnsByNames(v.Columns)
_, ok := table.OffsetsByNames(v.Columns)
if !ok {
return cerror.ErrDispatcherFailed.GenWithStack(
"columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns)
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/partition/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
dispatchCols = row.PreColumns
}

offsets, ok := row.TableInfo.ColumnsByNames(r.Columns)
offsets, ok := row.TableInfo.OffsetsByNames(r.Columns)
if !ok {
log.Error("columns not found when dispatch event",
zap.Any("tableName", row.Table),
Expand All @@ -65,7 +65,11 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
}

for idx := 0; idx < len(r.Columns); idx++ {
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
col := dispatchCols[offsets[idx]]
if col == nil {
continue
}
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(col.Value)))
}

sum32 := r.hasher.Sum32()
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven
"index not found when dispatch event, table: %v, index: %s", row.Table, r.IndexName)
}
for idx := 0; idx < len(names); idx++ {
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
col := dispatchCols[offsets[idx]]
if col == nil {
continue
}
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(col.Value)))
}
}

Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -97,6 +98,11 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

trans, err := columnselector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -118,7 +124,7 @@ func NewKafkaDMLSink(
concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, encoderGroup, protocol, scheme, errCh)
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID))
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -48,6 +49,8 @@ type dmlSink struct {

alive struct {
sync.RWMutex

transformer transformer.Transformer
// eventRouter used to route events to the right topic and partition.
eventRouter *dispatcher.EventRouter
// topicManager used to manage topics.
Expand Down Expand Up @@ -77,6 +80,7 @@ func newDMLSink(
adminClient kafka.ClusterAdminClient,
topicManager manager.TopicManager,
eventRouter *dispatcher.EventRouter,
transformer transformer.Transformer,
encoderGroup codec.EncoderGroup,
protocol config.Protocol,
scheme string,
Expand All @@ -95,6 +99,7 @@ func newDMLSink(
dead: make(chan struct{}),
scheme: scheme,
}
s.alive.transformer = transformer
s.alive.eventRouter = eventRouter
s.alive.topicManager = topicManager
s.alive.worker = worker
Expand Down Expand Up @@ -170,6 +175,13 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
s.cancel(err)
return errors.Trace(err)
}

err = s.alive.transformer.Apply(row)
if err != nil {
s.cancel(err)
return errors.Trace(err)
}

index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
if err != nil {
s.cancel(err)
Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -104,6 +105,11 @@ func NewPulsarDMLSink(
return nil, errors.Trace(err)
}

trans, err := columnselector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig,
config.DefaultMaxMessageBytes)
if err != nil {
Expand All @@ -119,7 +125,7 @@ func NewPulsarDMLSink(
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager,
eventRouter, encoderGroup, protocol, scheme, errCh)
eventRouter, trans, encoderGroup, protocol, scheme, errCh)

return s, nil
}
Loading
Loading