From 0334347ff6288f38fc20c5d7b2e01fc7fcb6d3ad Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 28 Sep 2023 16:31:23 +0800 Subject: [PATCH 01/15] index-value dispatcher support set index names. --- .../dmlsink/mq/dispatcher/event_router.go | 8 ++-- .../mq/dispatcher/event_router_test.go | 15 ++++--- .../mq/dispatcher/partition/default.go | 2 +- .../mq/dispatcher/partition/default_test.go | 2 +- .../mq/dispatcher/partition/dispatcher.go | 2 +- .../mq/dispatcher/partition/index_value.go | 43 +++++++++++++++---- .../dispatcher/partition/index_value_test.go | 5 ++- .../dmlsink/mq/dispatcher/partition/key.go | 4 +- .../mq/dispatcher/partition/key_test.go | 2 +- .../dmlsink/mq/dispatcher/partition/table.go | 4 +- .../mq/dispatcher/partition/table_test.go | 2 +- .../dmlsink/mq/dispatcher/partition/ts.go | 4 +- .../mq/dispatcher/partition/ts_test.go | 2 +- cdc/sink/dmlsink/mq/mq_dml_sink.go | 5 ++- cmd/kafka-consumer/main.go | 5 ++- pkg/config/sink.go | 4 +- 16 files changed, 74 insertions(+), 35 deletions(-) diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 478054557af..7b12c9117db 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -67,7 +67,7 @@ func NewEventRouter( f = filter.CaseInsensitive(f) } - d := getPartitionDispatcher(ruleConfig.PartitionRule, scheme) + d := getPartitionDispatcher(ruleConfig.PartitionRule, scheme, ruleConfig.IndexName) t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol, scheme) if err != nil { return nil, err @@ -116,7 +116,7 @@ func (s *EventRouter) GetTopicForDDL(ddl *model.DDLEvent) string { func (s *EventRouter) GetPartitionForRowChange( row *model.RowChangedEvent, partitionNum int32, -) (int32, string) { +) (int32, string, error) { _, partitionDispatcher := s.matchDispatcher( row.Table.Schema, row.Table.Table, ) @@ -176,7 +176,7 @@ func (s *EventRouter) matchDispatcher( } // getPartitionDispatcher returns the partition dispatcher for a specific partition rule. -func getPartitionDispatcher(rule string, scheme string) partition.Dispatcher { +func getPartitionDispatcher(rule string, scheme string, indexName string) partition.Dispatcher { switch strings.ToLower(rule) { case "default": return partition.NewDefaultDispatcher() @@ -186,7 +186,7 @@ func getPartitionDispatcher(rule string, scheme string) partition.Dispatcher { return partition.NewTableDispatcher() case "index-value", "rowid": log.Warn("rowid is deprecated, please use index-value instead.") - return partition.NewIndexValueDispatcher() + return partition.NewIndexValueDispatcher(indexName) default: } diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go index 02fb84b913f..f6b7bae3eb5 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go @@ -192,7 +192,7 @@ func TestGetPartitionForRowChange(t *testing.T) { d, err := NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "test", sink.KafkaScheme) require.NoError(t, err) - p, _ := d.GetPartitionForRowChange(&model.RowChangedEvent{ + p, _, err := d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_default1", Table: "table"}, Columns: []*model.Column{ { @@ -204,8 +204,9 @@ func TestGetPartitionForRowChange(t *testing.T) { IndexColumns: [][]int{{0}}, }, 16) require.Equal(t, int32(14), p) + require.NoError(t, err) - p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{ + p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_default2", Table: "table"}, Columns: []*model.Column{ { @@ -217,14 +218,16 @@ func TestGetPartitionForRowChange(t *testing.T) { IndexColumns: [][]int{{0}}, }, 16) require.Equal(t, int32(0), p) + require.NoError(t, err) - p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{ + p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_table", Table: "table"}, CommitTs: 1, }, 16) require.Equal(t, int32(15), p) + require.NoError(t, err) - p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{ + p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_index_value", Table: "table"}, Columns: []*model.Column{ { @@ -239,12 +242,14 @@ func TestGetPartitionForRowChange(t *testing.T) { }, }, 10) require.Equal(t, int32(1), p) + require.NoError(t, err) - p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{ + p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "a", Table: "table"}, CommitTs: 1, }, 2) require.Equal(t, int32(1), p) + require.NoError(t, err) } func TestGetTopicForDDL(t *testing.T) { diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/default.go b/cdc/sink/dmlsink/mq/dispatcher/partition/default.go index fb6ceb03689..291b49caf8a 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/default.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/default.go @@ -31,6 +31,6 @@ func NewDefaultDispatcher() *DefaultDispatcher { // DispatchRowChangedEvent returns the target partition to which // a row changed event should be dispatched. -func (d *DefaultDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) { +func (d *DefaultDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) { return d.tbd.DispatchRowChangedEvent(row, partitionNum) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go index b0319241e10..7fdc72a88f8 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go @@ -38,6 +38,6 @@ func TestDefaultDispatcher(t *testing.T) { IndexColumns: [][]int{{0}}, } - targetPartition, _ := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3) + targetPartition, _, _ := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3) require.Equal(t, int32(0), targetPartition) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/dispatcher.go b/cdc/sink/dmlsink/mq/dispatcher/partition/dispatcher.go index c0eb739734f..f06077ee968 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/dispatcher.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/dispatcher.go @@ -22,5 +22,5 @@ type Dispatcher interface { // DispatchRowChangedEvent returns an index of partitions or a partition key // according to RowChangedEvent. // Concurrency Note: This method is thread-safe. - DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) + DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 4476a905466..91e6b229474 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" ) @@ -25,18 +26,21 @@ import ( type IndexValueDispatcher struct { hasher *hash.PositionInertia lock sync.Mutex + + indexName string } // NewIndexValueDispatcher creates a IndexValueDispatcher. -func NewIndexValueDispatcher() *IndexValueDispatcher { +func NewIndexValueDispatcher(indexName string) *IndexValueDispatcher { return &IndexValueDispatcher{ - hasher: hash.NewPositionInertia(), + hasher: hash.NewPositionInertia(), + indexName: indexName, } } // DispatchRowChangedEvent returns the target partition to which // a row changed event should be dispatched. -func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) { +func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) { r.lock.Lock() defer r.lock.Unlock() r.hasher.Reset() @@ -46,14 +50,35 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven if len(row.Columns) == 0 { dispatchCols = row.PreColumns } - for _, col := range dispatchCols { - if col == nil { - continue + + if r.indexName != "" { + indexColumns := make(map[string]int) + for _, index := range row.TableInfo.Indices { + if index.Name.O == r.indexName { + for _, col := range index.Columns { + indexColumns[col.Name.O] = col.Offset + } + break + } + } + if len(indexColumns) == 0 { + return 0, "", errors.New("cannot found the target index columns") } - if col.Flag.IsHandleKey() { - r.hasher.Write([]byte(col.Name), []byte(model.ColumnValueString(col.Value))) + + for colName, offset := range indexColumns { + r.hasher.Write([]byte(colName), []byte(model.ColumnValueString(dispatchCols[offset].Value))) + } + } else { + for _, col := range dispatchCols { + if col == nil { + continue + } + if col.Flag.IsHandleKey() { + r.hasher.Write([]byte(col.Name), []byte(model.ColumnValueString(col.Value))) + } } } + sum32 := r.hasher.Sum32() - return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10) + return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10), nil } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go index 9d702163405..adf9ba6373e 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go @@ -147,9 +147,10 @@ func TestIndexValueDispatcher(t *testing.T) { }, }, expectPartition: 2}, } - p := NewIndexValueDispatcher() + p := NewIndexValueDispatcher("") for _, tc := range testCases { - index, _ := p.DispatchRowChangedEvent(tc.row, 16) + index, _, err := p.DispatchRowChangedEvent(tc.row, 16) require.Equal(t, tc.expectPartition, index) + require.NoError(t, err) } } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/key.go b/cdc/sink/dmlsink/mq/dispatcher/partition/key.go index 6f41371fbf1..ea0c99486c6 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/key.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/key.go @@ -32,6 +32,6 @@ func NewKeyDispatcher(partitionKey string) *KeyDispatcher { // DispatchRowChangedEvent returns the target partition to which // a row changed event should be dispatched. -func (t *KeyDispatcher) DispatchRowChangedEvent(*model.RowChangedEvent, int32) (int32, string) { - return 0, t.partitionKey +func (t *KeyDispatcher) DispatchRowChangedEvent(*model.RowChangedEvent, int32) (int32, string, error) { + return 0, t.partitionKey, nil } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go index 37843904795..ec397070374 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go @@ -39,7 +39,7 @@ func TestKeyDispatcher_DispatchRowChangedEvent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { d := NewKeyDispatcher(tt.partitionKey) - gotPartition, gotKey := d.DispatchRowChangedEvent(tt.rowChangedEvt, 0) + gotPartition, gotKey, _ := d.DispatchRowChangedEvent(tt.rowChangedEvt, 0) if gotPartition != tt.wantPartition { t.Errorf("DispatchRowChangedEvent() gotPartition = %v, want %v", gotPartition, tt.wantPartition) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/table.go b/cdc/sink/dmlsink/mq/dispatcher/partition/table.go index a577d7bac5e..0e9ad15653d 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/table.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/table.go @@ -36,11 +36,11 @@ func NewTableDispatcher() *TableDispatcher { // DispatchRowChangedEvent returns the target partition to which // a row changed event should be dispatched. -func (t *TableDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) { +func (t *TableDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) { t.lock.Lock() defer t.lock.Unlock() t.hasher.Reset() // distribute partition by table t.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) - return int32(t.hasher.Sum32() % uint32(partitionNum)), row.Table.String() + return int32(t.hasher.Sum32() % uint32(partitionNum)), row.Table.String(), nil } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go index 468dae23994..3bd9c612731 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go @@ -79,7 +79,7 @@ func TestTableDispatcher(t *testing.T) { } p := NewTableDispatcher() for _, tc := range testCases { - index, _ := p.DispatchRowChangedEvent(tc.row, 16) + index, _, _ := p.DispatchRowChangedEvent(tc.row, 16) require.Equal(t, tc.expectPartition, index) } } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/ts.go b/cdc/sink/dmlsink/mq/dispatcher/partition/ts.go index 9ba3dee3a35..45c3c9d45e3 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/ts.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/ts.go @@ -29,6 +29,6 @@ func NewTsDispatcher() *TsDispatcher { // DispatchRowChangedEvent returns the target partition to which // a row changed event should be dispatched. -func (t *TsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) { - return int32(row.CommitTs % uint64(partitionNum)), fmt.Sprintf("%d", row.CommitTs) +func (t *TsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) { + return int32(row.CommitTs % uint64(partitionNum)), fmt.Sprintf("%d", row.CommitTs), nil } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go index c73615c458f..49604cc096e 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go @@ -72,7 +72,7 @@ func TestTsDispatcher(t *testing.T) { } p := &TsDispatcher{} for _, tc := range testCases { - index, _ := p.DispatchRowChangedEvent(tc.row, 16) + index, _, _ := p.DispatchRowChangedEvent(tc.row, 16) require.Equal(t, tc.expectPartition, index) } } diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index b75cd0eefed..92ce2dfdc1e 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -150,7 +150,10 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa if err != nil { return errors.Trace(err) } - index, key := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) + index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) + if err != nil { + return errors.Trace(err) + } // This never be blocked because this is an unbounded channel. s.alive.worker.msgChan.In() <- mqEvent{ key: TopicPartitionKey{ diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index dd3718c9ce4..223df544d27 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -662,7 +662,10 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram } if c.eventRouter != nil { - target, _ := c.eventRouter.GetPartitionForRowChange(row, c.option.partitionNum) + target, _, err := c.eventRouter.GetPartitionForRowChange(row, c.option.partitionNum) + if err != nil { + return errors.Trace(err) + } if partition != target { log.Panic("RowChangedEvent dispatched to wrong partition", zap.Int32("obtained", partition), diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 7aee68ed128..4303881407e 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -292,7 +292,9 @@ type DispatchRule struct { // PartitionRule is an alias added for DispatcherRule to mitigate confusions. // In the future release, the DispatcherRule is expected to be removed . PartitionRule string `toml:"partition" json:"partition"` - TopicRule string `toml:"topic" json:"topic"` + IndexName string `toml:"index-name" json:"index-name"` + + TopicRule string `toml:"topic" json:"topic"` } // ColumnSelector represents a column selector for a table. From 1c386e95ff11f48482b646136f04f1a0d5f0ab5d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 7 Oct 2023 11:52:48 +0800 Subject: [PATCH 02/15] update swagger. --- docs/swagger/docs.go | 3 +++ docs/swagger/swagger.json | 3 +++ docs/swagger/swagger.yaml | 2 ++ 3 files changed, 8 insertions(+) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 2142d0c95de..c2f5e52d8fa 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1428,6 +1428,9 @@ var doc = `{ "description": "Deprecated, please use PartitionRule.", "type": "string" }, + "index-name": { + "type": "string" + }, "matcher": { "type": "array", "items": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 6ef5d9254c6..009460ab198 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1409,6 +1409,9 @@ "description": "Deprecated, please use PartitionRule.", "type": "string" }, + "index-name": { + "type": "string" + }, "matcher": { "type": "array", "items": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index ba8053f5feb..1c9ed74d5d0 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -57,6 +57,8 @@ definitions: dispatcher: description: Deprecated, please use PartitionRule. type: string + index-name: + type: string matcher: items: type: string From 18b24888972a542ff45e0721206ad4bdcbfd57f2 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 7 Oct 2023 15:06:59 +0800 Subject: [PATCH 03/15] add more unit test. --- cdc/model/sink.go | 16 +++++++ cdc/model/sink_test.go | 33 +++++++++++++ .../mq/dispatcher/event_router_test.go | 10 ++-- .../mq/dispatcher/partition/default_test.go | 3 +- .../mq/dispatcher/partition/index_value.go | 28 ++++------- .../dispatcher/partition/index_value_test.go | 46 +++++++++++++++++++ .../mq/dispatcher/partition/key_test.go | 4 +- .../mq/dispatcher/partition/table_test.go | 3 +- .../mq/dispatcher/partition/ts_test.go | 3 +- cdc/sink/dmlsink/mq/mq_dml_sink.go | 4 +- errors.toml | 25 ++-------- pkg/errors/cdc_errors.go | 26 ++--------- 12 files changed, 131 insertions(+), 70 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index decb3d1e8cc..aef82a8fcc2 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -507,6 +507,22 @@ func (r *RowChangedEvent) ApproximateBytes() int { return size } +// IndexByName returns the index columns and offsets of the corresponding index by name +func (r *RowChangedEvent) IndexByName(name string) ([]string, []int, bool) { + for _, index := range r.TableInfo.Indices { + if index.Name.O == name { + names := make([]string, 0, len(index.Columns)) + offset := make([]int, 0, len(index.Columns)) + for _, col := range index.Columns { + names = append(names, col.Name.O) + offset = append(offset, col.Offset) + } + return names, offset, true + } + } + return nil, nil, false +} + // Column represents a column value in row changed event type Column struct { Name string `json:"name" msg:"name"` diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index eb899d6c109..334bf3aed58 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -609,3 +609,36 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { require.NoError(t, err) require.Len(t, txn.Rows, 1) } + +func TestIndexByName(t *testing.T) { + event := &RowChangedEvent{ + TableInfo: &TableInfo{ + TableInfo: &timodel.TableInfo{ + Indices: []*timodel.IndexInfo{ + { + Name: timodel.CIStr{ + O: "idx1", + }, + Columns: []*timodel.IndexColumn{ + { + Name: timodel.CIStr{ + O: "col1", + }, + }, + }, + }, + }, + }, + }, + } + + names, offsets, ok := event.IndexByName("idx2") + require.False(t, ok) + require.Nil(t, names) + require.Nil(t, offsets) + + names, offsets, ok = event.IndexByName("idx1") + require.True(t, ok) + require.Equal(t, []string{"col1"}, names) + require.Equal(t, []int{0}, offsets) +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go index f6b7bae3eb5..1c9cc357801 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go @@ -203,8 +203,8 @@ func TestGetPartitionForRowChange(t *testing.T) { }, IndexColumns: [][]int{{0}}, }, 16) - require.Equal(t, int32(14), p) require.NoError(t, err) + require.Equal(t, int32(14), p) p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_default2", Table: "table"}, @@ -217,15 +217,15 @@ func TestGetPartitionForRowChange(t *testing.T) { }, IndexColumns: [][]int{{0}}, }, 16) - require.Equal(t, int32(0), p) require.NoError(t, err) + require.Equal(t, int32(0), p) p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_table", Table: "table"}, CommitTs: 1, }, 16) - require.Equal(t, int32(15), p) require.NoError(t, err) + require.Equal(t, int32(15), p) p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "test_index_value", Table: "table"}, @@ -241,15 +241,15 @@ func TestGetPartitionForRowChange(t *testing.T) { }, }, }, 10) - require.Equal(t, int32(1), p) require.NoError(t, err) + require.Equal(t, int32(1), p) p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{ Table: &model.TableName{Schema: "a", Table: "table"}, CommitTs: 1, }, 2) - require.Equal(t, int32(1), p) require.NoError(t, err) + require.Equal(t, int32(1), p) } func TestGetTopicForDDL(t *testing.T) { diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go index 7fdc72a88f8..e9b246c494c 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go @@ -38,6 +38,7 @@ func TestDefaultDispatcher(t *testing.T) { IndexColumns: [][]int{{0}}, } - targetPartition, _, _ := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3) + targetPartition, _, err := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3) + require.NoError(t, err) require.Equal(t, int32(0), targetPartition) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 91e6b229474..6766b9a5275 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -51,24 +51,8 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven dispatchCols = row.PreColumns } - if r.indexName != "" { - indexColumns := make(map[string]int) - for _, index := range row.TableInfo.Indices { - if index.Name.O == r.indexName { - for _, col := range index.Columns { - indexColumns[col.Name.O] = col.Offset - } - break - } - } - if len(indexColumns) == 0 { - return 0, "", errors.New("cannot found the target index columns") - } - - for colName, offset := range indexColumns { - r.hasher.Write([]byte(colName), []byte(model.ColumnValueString(dispatchCols[offset].Value))) - } - } else { + // the most normal case, index-name is not set, use the handle key columns. + if r.indexName == "" { for _, col := range dispatchCols { if col == nil { continue @@ -77,6 +61,14 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven r.hasher.Write([]byte(col.Name), []byte(model.ColumnValueString(col.Value))) } } + } else { + names, offsets, ok := row.IndexByName(r.indexName) + if !ok { + return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found: %s", r.indexName) + } + for idx := 0; idx < len(names); idx++ { + r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) + } } sum32 := r.hasher.Sum32() diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go index adf9ba6373e..a9cd51ac1a9 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go @@ -16,7 +16,9 @@ package partition import ( "testing" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) @@ -154,3 +156,47 @@ func TestIndexValueDispatcher(t *testing.T) { require.NoError(t, err) } } + +func TestIndexValueDispatcherWithIndexName(t *testing.T) { + t.Parallel() + + event := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t1", + }, + TableInfo: &model.TableInfo{ + TableInfo: &timodel.TableInfo{ + Indices: []*timodel.IndexInfo{ + { + Name: timodel.CIStr{ + O: "index1", + }, + Columns: []*timodel.IndexColumn{ + { + Name: timodel.CIStr{ + O: "a", + }, + }, + }, + }, + }, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: 11, + }, + }, + } + + p := NewIndexValueDispatcher("index2") + _, _, err := p.DispatchRowChangedEvent(event, 16) + require.ErrorIs(t, err, errors.ErrDispatcherRuntime) + + p = NewIndexValueDispatcher("index1") + index, _, err := p.DispatchRowChangedEvent(event, 16) + require.NoError(t, err) + require.Equal(t, int32(2), index) +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go index ec397070374..380ac1e4485 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" ) func TestKeyDispatcher_DispatchRowChangedEvent(t *testing.T) { @@ -39,7 +40,8 @@ func TestKeyDispatcher_DispatchRowChangedEvent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { d := NewKeyDispatcher(tt.partitionKey) - gotPartition, gotKey, _ := d.DispatchRowChangedEvent(tt.rowChangedEvt, 0) + gotPartition, gotKey, err := d.DispatchRowChangedEvent(tt.rowChangedEvt, 0) + require.NoError(t, err) if gotPartition != tt.wantPartition { t.Errorf("DispatchRowChangedEvent() gotPartition = %v, want %v", gotPartition, tt.wantPartition) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go index 3bd9c612731..55fafcf2719 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go @@ -79,7 +79,8 @@ func TestTableDispatcher(t *testing.T) { } p := NewTableDispatcher() for _, tc := range testCases { - index, _, _ := p.DispatchRowChangedEvent(tc.row, 16) + index, _, err := p.DispatchRowChangedEvent(tc.row, 16) + require.NoError(t, err) require.Equal(t, tc.expectPartition, index) } } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go index 49604cc096e..08e6e1ba9f2 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go @@ -72,7 +72,8 @@ func TestTsDispatcher(t *testing.T) { } p := &TsDispatcher{} for _, tc := range testCases { - index, _, _ := p.DispatchRowChangedEvent(tc.row, 16) + index, _, err := p.DispatchRowChangedEvent(tc.row, 16) + require.NoError(t, err) require.Equal(t, tc.expectPartition, index) } } diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 92ce2dfdc1e..67a38c8b1af 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -148,11 +148,11 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa topic := s.alive.eventRouter.GetTopicForRowChange(row) partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic) if err != nil { - return errors.Trace(err) + return err } index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) if err != nil { - return errors.Trace(err) + return err } // This never be blocked because this is an unbounded channel. s.alive.worker.msgChan.In() <- mqEvent{ diff --git a/errors.toml b/errors.toml index e1cbd4c159c..0b3d3b8ad46 100755 --- a/errors.toml +++ b/errors.toml @@ -231,6 +231,11 @@ error = ''' failed to preallocate file because disk is full ''' +["CDC:ErrDispatcherRuntime"] +error = ''' +dispatcher runtime error +''' + ["CDC:ErrEncodeFailed"] error = ''' encode failed: %s @@ -306,11 +311,6 @@ error = ''' filter rule is invalid %v ''' -["CDC:ErrFlowControllerAborted"] -error = ''' -flow controller is aborted -''' - ["CDC:ErrGCTTLExceeded"] error = ''' the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL and the changefeed is blocking global GC progression @@ -346,11 +346,6 @@ error = ''' illegal parameter for sorter: %s ''' -["CDC:ErrIncompatibleConfig"] -error = ''' -incompatible configuration -''' - ["CDC:ErrIncompatibleSinkConfig"] error = ''' incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri @@ -421,11 +416,6 @@ error = ''' invalid replica config, %s ''' -["CDC:ErrInvalidS3URI"] -error = ''' -invalid s3 uri: %s -''' - ["CDC:ErrInvalidServerOption"] error = ''' invalid server option @@ -576,11 +566,6 @@ error = ''' new store failed ''' -["CDC:ErrNoPendingRegion"] -error = ''' -received event regionID %v, requestID %v from %v, but neither pending region nor running region was found -''' - ["CDC:ErrNotController"] error = ''' not controller diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index d88ded6bf25..bb60ce4e6fa 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -90,11 +90,6 @@ var ( "unknown kv optype: %s, entry: %v", errors.RFCCodeText("CDC:ErrUnknownKVEventType"), ) - ErrNoPendingRegion = errors.Normalize( - "received event regionID %v, requestID %v from %v, "+ - "but neither pending region nor running region was found", - errors.RFCCodeText("CDC:ErrNoPendingRegion"), - ) ErrPrewriteNotMatch = errors.Normalize( "prewrite not match, key: %s, start-ts: %d, commit-ts: %d, type: %s, optype: %s", errors.RFCCodeText("CDC:ErrPrewriteNotMatch"), @@ -116,7 +111,7 @@ var ( errors.RFCCodeText("CDC:ErrRegionWorkerExit"), ) - // rule related errors + // codec related errors ErrEncodeFailed = errors.Normalize( "encode failed: %s", errors.RFCCodeText("CDC:ErrEncodeFailed"), @@ -129,6 +124,10 @@ var ( "filter rule is invalid %v", errors.RFCCodeText("CDC:ErrFilterRuleInvalid"), ) + ErrDispatcherRuntime = errors.Normalize( + "dispatcher runtime error", + errors.RFCCodeText("CDC:ErrDispatcherRuntime"), + ) // internal errors ErrAdminStopProcessor = errors.Normalize( @@ -387,10 +386,6 @@ var ( "old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled"), ) - ErrIncompatibleConfig = errors.Normalize( - "incompatible configuration", - errors.RFCCodeText("CDC:ErrIncompatibleConfig"), - ) ErrSinkInvalidConfig = errors.Normalize( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), @@ -742,11 +737,6 @@ var ( "consistent storage (%s) not support", errors.RFCCodeText("CDC:ErrConsistentStorage"), ) - ErrInvalidS3URI = errors.Normalize( - "invalid s3 uri: %s", - errors.RFCCodeText("CDC:ErrInvalidS3URI"), - ) - // sorter errors ErrIllegalSorterParameter = errors.Normalize( "illegal parameter for sorter: %s", @@ -757,12 +747,6 @@ var ( errors.RFCCodeText("ErrConflictingFileLocks"), ) - // miscellaneous internal errors - ErrFlowControllerAborted = errors.Normalize( - "flow controller is aborted", - errors.RFCCodeText("CDC:ErrFlowControllerAborted"), - ) - // retry error ErrReachMaxTry = errors.Normalize("reach maximum try: %s, error: %s", errors.RFCCodeText("CDC:ErrReachMaxTry"), From fc8df7f0aa7e0ba5036135cdd9bb171d850985d7 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 7 Oct 2023 15:26:56 +0800 Subject: [PATCH 04/15] fix typo. --- cdc/api/v2/api_helpers.go | 6 +++--- cdc/api/v2/api_helpers_mock.go | 6 +++--- cdc/api/v2/changefeed.go | 2 +- cdc/api/v2/changefeed_test.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 97fa5efa923..9db856a388b 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -112,8 +112,8 @@ type APIV2Helpers interface { credential *security.Credential, ) (tidbkv.Storage, error) - // getVerfiedTables wraps entry.VerifyTables to increase testability - getVerfiedTables(replicaConfig *config.ReplicaConfig, + // getVerifiedTables wraps entry.VerifyTables to increase testability + getVerifiedTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error, ) @@ -491,7 +491,7 @@ func (h APIV2HelpersImpl) createTiStore(pdAddrs []string, return kv.CreateTiStore(strings.Join(pdAddrs, ","), credential) } -func (h APIV2HelpersImpl) getVerfiedTables(replicaConfig *config.ReplicaConfig, +func (h APIV2HelpersImpl) getVerifiedTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error, ) { diff --git a/cdc/api/v2/api_helpers_mock.go b/cdc/api/v2/api_helpers_mock.go index 733c48140fa..448dd898148 100644 --- a/cdc/api/v2/api_helpers_mock.go +++ b/cdc/api/v2/api_helpers_mock.go @@ -88,9 +88,9 @@ func (mr *MockAPIV2HelpersMockRecorder) getPDClient(ctx, pdAddrs, credential int } // getVerfiedTables mocks base method. -func (m *MockAPIV2Helpers) getVerfiedTables(replicaConfig *config.ReplicaConfig, storage kv.Storage, startTs uint64) ([]model.TableName, []model.TableName, error) { +func (m *MockAPIV2Helpers) getVerifiedTables(replicaConfig *config.ReplicaConfig, storage kv.Storage, startTs uint64) ([]model.TableName, []model.TableName, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "getVerfiedTables", replicaConfig, storage, startTs) + ret := m.ctrl.Call(m, "getVerifiedTables", replicaConfig, storage, startTs) ret0, _ := ret[0].([]model.TableName) ret1, _ := ret[1].([]model.TableName) ret2, _ := ret[2].(error) @@ -100,7 +100,7 @@ func (m *MockAPIV2Helpers) getVerfiedTables(replicaConfig *config.ReplicaConfig, // getVerfiedTables indicates an expected call of getVerfiedTables. func (mr *MockAPIV2HelpersMockRecorder) getVerfiedTables(replicaConfig, storage, startTs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getVerfiedTables", reflect.TypeOf((*MockAPIV2Helpers)(nil).getVerfiedTables), replicaConfig, storage, startTs) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getVerifiedTables", reflect.TypeOf((*MockAPIV2Helpers)(nil).getVerifiedTables), replicaConfig, storage, startTs) } // verifyCreateChangefeedConfig mocks base method. diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 137d1c1e38d..8c5eafd3495 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -343,7 +343,7 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { } replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig() ineligibleTables, eligibleTables, err := h.helpers. - getVerfiedTables(replicaCfg, kvStore, cfg.StartTs) + getVerifiedTables(replicaCfg, kvStore, cfg.StartTs) if err != nil { _ = c.Error(err) return diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index f85221a2f09..b2c1d718bb6 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -662,7 +662,7 @@ func TestVerifyTable(t *testing.T) { require.Nil(t, err) require.Contains(t, respErr.Code, "ErrNewStore") - // case 3: getVerfiedTables failed + // case 3: getVerifiedTables failed helpers.EXPECT(). createTiStore(gomock.Any(), gomock.Any()). Return(nil, nil). From cb6512cfac07c1d30cb854ebc5fa39bc63fabcde Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 7 Oct 2023 15:48:59 +0800 Subject: [PATCH 05/15] add verify index dispatcher logic when create or update the changefeed. --- cdc/api/v2/api_helpers.go | 43 ++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 9db856a388b..0349cc7b1da 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -29,11 +29,13 @@ import ( "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/owner" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/validator" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/version" "github.com/r3labs/diff" @@ -491,15 +493,42 @@ func (h APIV2HelpersImpl) createTiStore(pdAddrs []string, return kv.CreateTiStore(strings.Join(pdAddrs, ","), credential) } -func (h APIV2HelpersImpl) getVerifiedTables(replicaConfig *config.ReplicaConfig, - storage tidbkv.Storage, startTs uint64) (ineligibleTables, - eligibleTables []model.TableName, err error, -) { +func (h APIV2HelpersImpl) getVerifiedTables( + replicaConfig *config.ReplicaConfig, + storage tidbkv.Storage, startTs uint64, + scheme string, topic string, protocol config.Protocol, +) ([]model.TableName, []model.TableName, error) { f, err := filter.NewFilter(replicaConfig, "") if err != nil { - return + return nil, nil, err } - _, ineligibleTables, eligibleTables, err = entry. + tableInfos, ineligibleTables, eligibleTables, err := entry. VerifyTables(f, storage, startTs) - return + if err != nil { + return nil, nil, err + } + + if !sink.IsMQScheme(scheme) { + return ineligibleTables, eligibleTables, nil + } + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, protocol, topic, scheme) + if err != nil { + return nil, nil, err + } + + for _, table := range tableInfos { + dummyEvent := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: table.TableName.Schema, + Table: table.TableName.Table, + }, + TableInfo: table, + } + _, _, err := eventRouter.GetPartitionForRowChange(dummyEvent, 1) + if err != nil { + return nil, nil, err + } + } + return ineligibleTables, eligibleTables, nil } From 359e8265fec8c6112891ce33ea16f0db86fd0f7d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 7 Oct 2023 16:17:37 +0800 Subject: [PATCH 06/15] update mocks. --- cdc/api/v2/api_helpers.go | 4 +++- cdc/api/v2/api_helpers_mock.go | 12 ++++++------ cdc/api/v2/changefeed.go | 15 ++++++++++++++- cdc/api/v2/model.go | 1 + .../mq/dispatcher/partition/index_value.go | 5 +++++ pkg/cmd/cli/cli_changefeed_create.go | 1 + 6 files changed, 30 insertions(+), 8 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 0349cc7b1da..9981574ad0a 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -116,7 +116,9 @@ type APIV2Helpers interface { // getVerifiedTables wraps entry.VerifyTables to increase testability getVerifiedTables(replicaConfig *config.ReplicaConfig, - storage tidbkv.Storage, startTs uint64) (ineligibleTables, + storage tidbkv.Storage, startTs uint64, + scheme string, topic string, protocol config.Protocol, + ) (ineligibleTables, eligibleTables []model.TableName, err error, ) } diff --git a/cdc/api/v2/api_helpers_mock.go b/cdc/api/v2/api_helpers_mock.go index 448dd898148..a1374c364f6 100644 --- a/cdc/api/v2/api_helpers_mock.go +++ b/cdc/api/v2/api_helpers_mock.go @@ -87,20 +87,20 @@ func (mr *MockAPIV2HelpersMockRecorder) getPDClient(ctx, pdAddrs, credential int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getPDClient", reflect.TypeOf((*MockAPIV2Helpers)(nil).getPDClient), ctx, pdAddrs, credential) } -// getVerfiedTables mocks base method. -func (m *MockAPIV2Helpers) getVerifiedTables(replicaConfig *config.ReplicaConfig, storage kv.Storage, startTs uint64) ([]model.TableName, []model.TableName, error) { +// getVerifiedTables mocks base method. +func (m *MockAPIV2Helpers) getVerifiedTables(replicaConfig *config.ReplicaConfig, storage kv.Storage, startTs uint64, scheme, topic string, protocol config.Protocol) ([]model.TableName, []model.TableName, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "getVerifiedTables", replicaConfig, storage, startTs) + ret := m.ctrl.Call(m, "getVerifiedTables", replicaConfig, storage, startTs, scheme, topic, protocol) ret0, _ := ret[0].([]model.TableName) ret1, _ := ret[1].([]model.TableName) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } -// getVerfiedTables indicates an expected call of getVerfiedTables. -func (mr *MockAPIV2HelpersMockRecorder) getVerfiedTables(replicaConfig, storage, startTs interface{}) *gomock.Call { +// getVerifiedTables indicates an expected call of getVerifiedTables. +func (mr *MockAPIV2HelpersMockRecorder) getVerifiedTables(replicaConfig, storage, startTs, scheme, topic, protocol interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getVerifiedTables", reflect.TypeOf((*MockAPIV2Helpers)(nil).getVerifiedTables), replicaConfig, storage, startTs) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getVerifiedTables", reflect.TypeOf((*MockAPIV2Helpers)(nil).getVerifiedTables), replicaConfig, storage, startTs, scheme, topic, protocol) } // verifyCreateChangefeedConfig mocks base method. diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 8c5eafd3495..cd64a9d74b8 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net/http" + "net/url" "sort" "strings" "time" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/txnutil/gc" @@ -341,9 +343,20 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { _ = c.Error(err) return } + uri, err := url.Parse(cfg.SinkURI) + if err != nil { + _ = c.Error(err) + return + } + scheme := uri.Scheme + topic := strings.TrimFunc(uri.Path, func(r rune) bool { + return r == '/' + }) replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig() + protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol)) + ineligibleTables, eligibleTables, err := h.helpers. - getVerifiedTables(replicaCfg, kvStore, cfg.StartTs) + getVerifiedTables(replicaCfg, kvStore, cfg.StartTs, scheme, topic, protocol) if err != nil { _ = c.Error(err) return diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index e4da00c4928..548fc9fda92 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -68,6 +68,7 @@ type VerifyTableConfig struct { PDConfig ReplicaConfig *ReplicaConfig `json:"replica_config"` StartTs uint64 `json:"start_ts"` + SinkURI string `json:"sink_uri"` } func getDefaultVerifyTableConfig() *VerifyTableConfig { diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 6766b9a5275..2048a5083e4 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -17,9 +17,11 @@ import ( "strconv" "sync" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "go.uber.org/zap" ) // IndexValueDispatcher is a partition dispatcher which dispatches events based on the index value. @@ -64,6 +66,9 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven } else { names, offsets, ok := row.IndexByName(r.indexName) if !ok { + log.Error("index not found", + zap.Any("tableName", row.Table), + zap.String("indexName", r.indexName)) return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found: %s", r.indexName) } for idx := 0; idx < len(names); idx++ { diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index 0a7b427adda..4e91f6916f9 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -278,6 +278,7 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e }, ReplicaConfig: createChangefeedCfg.ReplicaConfig, StartTs: createChangefeedCfg.StartTs, + SinkURI: createChangefeedCfg.SinkURI, } tables, err := o.apiClient.Changefeeds().VerifyTable(ctx, verifyTableConfig) From 0958830dc733ff3f985ef8356da175793af85871 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 7 Oct 2023 17:23:12 +0800 Subject: [PATCH 07/15] fix changes. --- cdc/api/v2/changefeed_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index b2c1d718bb6..7134f0ffc71 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -178,7 +178,8 @@ func TestCreateChangefeed(t *testing.T) { helpers.EXPECT(). getEtcdClient(gomock.Any(), gomock.Any()). Return(testEtcdCluster.RandClient(), nil) - helpers.EXPECT().getVerfiedTables(gomock.Any(), gomock.Any(), gomock.Any()). + helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, nil, nil). AnyTimes() helpers.EXPECT(). @@ -667,7 +668,8 @@ func TestVerifyTable(t *testing.T) { createTiStore(gomock.Any(), gomock.Any()). Return(nil, nil). AnyTimes() - helpers.EXPECT().getVerfiedTables(gomock.Any(), gomock.Any(), gomock.Any()). + helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, nil, cerrors.ErrFilterRuleInvalid). Times(1) @@ -688,7 +690,8 @@ func TestVerifyTable(t *testing.T) { ineligible := []model.TableName{ {Schema: "test", Table: "invalidTable"}, } - helpers.EXPECT().getVerfiedTables(gomock.Any(), gomock.Any(), gomock.Any()). + helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()). Return(eligible, ineligible, nil) w = httptest.NewRecorder() From add745c3ecb80d97455187833a4549e75f8e88a6 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sun, 8 Oct 2023 14:55:06 +0800 Subject: [PATCH 08/15] adjust logs. --- cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 2048a5083e4..5452c84c8c4 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -66,10 +66,10 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven } else { names, offsets, ok := row.IndexByName(r.indexName) if !ok { - log.Error("index not found", + log.Error("index not found when dispatch event", zap.Any("tableName", row.Table), zap.String("indexName", r.indexName)) - return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found: %s", r.indexName) + return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found when dispatch event: %s", r.indexName) } for idx := 0; idx < len(names); idx++ { r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) From 78a5d2441190b8e82e1df0cc5f22bfcb3a721b8a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sun, 8 Oct 2023 15:47:51 +0800 Subject: [PATCH 09/15] add some logs. --- cdc/api/v2/model.go | 3 +++ cdc/sink/dmlsink/mq/dispatcher/event_router.go | 6 ++++-- cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go | 4 ++++ pkg/config/sink.go | 2 +- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 548fc9fda92..18f073eb7ff 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -273,6 +273,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Matcher: rule.Matcher, DispatcherRule: "", PartitionRule: rule.PartitionRule, + IndexName: rule.IndexName, TopicRule: rule.TopicRule, }) } @@ -548,6 +549,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { dispatchRules = append(dispatchRules, &DispatchRule{ Matcher: rule.Matcher, PartitionRule: rule.PartitionRule, + IndexName: rule.IndexName, TopicRule: rule.TopicRule, }) } @@ -908,6 +910,7 @@ type LargeMessageHandleConfig struct { type DispatchRule struct { Matcher []string `json:"matcher,omitempty"` PartitionRule string `json:"partition"` + IndexName string `json:"index"` TopicRule string `json:"topic"` } diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 7b12c9117db..740fbb1f969 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -184,8 +184,10 @@ func getPartitionDispatcher(rule string, scheme string, indexName string) partit return partition.NewTsDispatcher() case "table": return partition.NewTableDispatcher() - case "index-value", "rowid": - log.Warn("rowid is deprecated, please use index-value instead.") + case "index-value": + return partition.NewIndexValueDispatcher(indexName) + case "rowid": + log.Warn("rowid is deprecated, index-value is used as the partition dispatcher.") return partition.NewIndexValueDispatcher(indexName) default: } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 5452c84c8c4..e784e889dc0 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -71,6 +71,10 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven zap.String("indexName", r.indexName)) return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found when dispatch event: %s", r.indexName) } + log.Info("index found when dispatch event", + zap.Any("tableName", row.Table), + zap.String("indexName", r.indexName), + zap.Any("names", names), zap.Any("offsets", offsets)) for idx := 0; idx < len(names); idx++ { r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 4303881407e..e78ad20a205 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -292,7 +292,7 @@ type DispatchRule struct { // PartitionRule is an alias added for DispatcherRule to mitigate confusions. // In the future release, the DispatcherRule is expected to be removed . PartitionRule string `toml:"partition" json:"partition"` - IndexName string `toml:"index-name" json:"index-name"` + IndexName string `toml:"index" json:"index"` TopicRule string `toml:"topic" json:"topic"` } From bfb2969a7e26452bf033bf4736cd4e07144cf967 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Oct 2023 15:29:16 +0800 Subject: [PATCH 10/15] tiny adjust. --- cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go | 4 ---- pkg/errors/helper.go | 1 + 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index e784e889dc0..5452c84c8c4 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -71,10 +71,6 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven zap.String("indexName", r.indexName)) return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found when dispatch event: %s", r.indexName) } - log.Info("index found when dispatch event", - zap.Any("tableName", row.Table), - zap.String("indexName", r.indexName), - zap.Any("names", names), zap.Any("offsets", offsets)) for idx := 0; idx < len(names); idx++ { r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) } diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index c8e5019a639..8e83cff82dc 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -77,6 +77,7 @@ var changefeedUnRetryableErrors = []*errors.Error{ ErrSyncRenameTableFailed, ErrChangefeedUnretryable, ErrCorruptedDataMutation, + ErrDispatcherRuntime, ErrSinkURIInvalid, ErrKafkaInvalidConfig, From 3363e2c15710d9b89569f9ca278b5fb85865b83b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Oct 2023 17:11:54 +0800 Subject: [PATCH 11/15] fix bugs. --- cdc/api/v2/api_helpers.go | 16 ++++--------- cdc/model/schema_storage.go | 16 +++++++++++++ cdc/model/sink.go | 16 ------------- .../dmlsink/mq/dispatcher/event_router.go | 15 ++++++++++++ .../mq/dispatcher/partition/index_value.go | 13 +++++----- .../dispatcher/partition/index_value_test.go | 2 +- cdc/sink/dmlsink/mq/mq_dml_sink.go | 24 +++++++++++++------ errors.toml | 4 ++-- pkg/errors/cdc_errors.go | 7 +++--- pkg/errors/helper.go | 2 +- 10 files changed, 67 insertions(+), 48 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 9981574ad0a..f84037747f6 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -519,18 +519,10 @@ func (h APIV2HelpersImpl) getVerifiedTables( return nil, nil, err } - for _, table := range tableInfos { - dummyEvent := &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: table.TableName.Schema, - Table: table.TableName.Table, - }, - TableInfo: table, - } - _, _, err := eventRouter.GetPartitionForRowChange(dummyEvent, 1) - if err != nil { - return nil, nil, err - } + err = eventRouter.VerifyTables(tableInfos) + if err != nil { + return nil, nil, err } + return ineligibleTables, eligibleTables, nil } diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 869c5d38aca..c9a2b21da39 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -315,3 +315,19 @@ func (ti *TableInfo) IsIndexUnique(indexInfo *model.IndexInfo) bool { func (ti *TableInfo) Clone() *TableInfo { return WrapTableInfo(ti.SchemaID, ti.TableName.Schema, ti.Version, ti.TableInfo.Clone()) } + +// IndexByName returns the index columns and offsets of the corresponding index by name +func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) { + for _, index := range ti.Indices { + if index.Name.O == name { + names := make([]string, 0, len(index.Columns)) + offset := make([]int, 0, len(index.Columns)) + for _, col := range index.Columns { + names = append(names, col.Name.O) + offset = append(offset, col.Offset) + } + return names, offset, true + } + } + return nil, nil, false +} diff --git a/cdc/model/sink.go b/cdc/model/sink.go index aef82a8fcc2..decb3d1e8cc 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -507,22 +507,6 @@ func (r *RowChangedEvent) ApproximateBytes() int { return size } -// IndexByName returns the index columns and offsets of the corresponding index by name -func (r *RowChangedEvent) IndexByName(name string) ([]string, []int, bool) { - for _, index := range r.TableInfo.Indices { - if index.Name.O == name { - names := make([]string, 0, len(index.Columns)) - offset := make([]int, 0, len(index.Columns)) - for _, col := range index.Columns { - names = append(names, col.Name.O) - offset = append(offset, col.Offset) - } - return names, offset, true - } - } - return nil, nil, false -} - // Column represents a column value in row changed event type Column struct { Name string `json:"name" msg:"name"` diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 740fbb1f969..a11013cf7b5 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -126,6 +126,21 @@ func (s *EventRouter) GetPartitionForRowChange( ) } +// VerifyTables return error if any one table route rule is invalid. +func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error { + for _, table := range infos { + _, partitionDispatcher := s.matchDispatcher(table.TableName.Schema, table.TableName.Table) + if v, ok := partitionDispatcher.(*partition.IndexValueDispatcher); ok { + _, _, ok = table.IndexByName(v.IndexName) + if !ok { + return cerror.ErrDispatcherFailed.GenWithStack( + "index not found when verify the table, table: %v, index: %s", table.TableName, v.IndexName) + } + } + } + return nil +} + // GetActiveTopics returns a list of the corresponding topics // for the tables that are actively synchronized. func (s *EventRouter) GetActiveTopics(activeTables []model.TableName) []string { diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 5452c84c8c4..0a41369d589 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -29,14 +29,14 @@ type IndexValueDispatcher struct { hasher *hash.PositionInertia lock sync.Mutex - indexName string + IndexName string } // NewIndexValueDispatcher creates a IndexValueDispatcher. func NewIndexValueDispatcher(indexName string) *IndexValueDispatcher { return &IndexValueDispatcher{ hasher: hash.NewPositionInertia(), - indexName: indexName, + IndexName: indexName, } } @@ -54,7 +54,7 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven } // the most normal case, index-name is not set, use the handle key columns. - if r.indexName == "" { + if r.IndexName == "" { for _, col := range dispatchCols { if col == nil { continue @@ -64,12 +64,13 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven } } } else { - names, offsets, ok := row.IndexByName(r.indexName) + names, offsets, ok := row.TableInfo.IndexByName(r.IndexName) if !ok { log.Error("index not found when dispatch event", zap.Any("tableName", row.Table), - zap.String("indexName", r.indexName)) - return 0, "", errors.ErrDispatcherRuntime.GenWithStack("index not found when dispatch event: %s", r.indexName) + zap.String("indexName", r.IndexName)) + return 0, "", errors.ErrDispatcherFailed.GenWithStack( + "index not found when dispatch event, table: %v, index: %s", row.Table, r.IndexName) } for idx := 0; idx < len(names); idx++ { r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go index a9cd51ac1a9..73d603e67b9 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go @@ -193,7 +193,7 @@ func TestIndexValueDispatcherWithIndexName(t *testing.T) { p := NewIndexValueDispatcher("index2") _, _, err := p.DispatchRowChangedEvent(event, 16) - require.ErrorIs(t, err, errors.ErrDispatcherRuntime) + require.ErrorIs(t, err, errors.ErrDispatcherFailed) p = NewIndexValueDispatcher("index1") index, _, err := p.DispatchRowChangedEvent(event, 16) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 67a38c8b1af..e883bc7ad5e 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -64,6 +64,8 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} + errCh chan error + scheme string } @@ -90,7 +92,9 @@ func newDMLSink( ctx: ctx, cancel: cancel, dead: make(chan struct{}), - scheme: scheme, + errCh: errCh, + + scheme: scheme, } s.alive.eventRouter = eventRouter s.alive.topicManager = topicManager @@ -108,17 +112,21 @@ func newDMLSink( s.alive.Unlock() close(s.dead) - if err != nil && errors.Cause(err) != context.Canceled { - select { - case <-ctx.Done(): - case errCh <- err: - } - } + s.handleError(err) }() return s } +func (s *dmlSink) handleError(err error) { + if err != nil && errors.Cause(err) != context.Canceled { + select { + case <-s.ctx.Done(): + case s.errCh <- err: + } + } +} + // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTableTxn]) error { @@ -148,10 +156,12 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa topic := s.alive.eventRouter.GetTopicForRowChange(row) partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic) if err != nil { + s.handleError(err) return err } index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) if err != nil { + s.handleError(err) return err } // This never be blocked because this is an unbounded channel. diff --git a/errors.toml b/errors.toml index 0b3d3b8ad46..c000bfbb739 100755 --- a/errors.toml +++ b/errors.toml @@ -231,9 +231,9 @@ error = ''' failed to preallocate file because disk is full ''' -["CDC:ErrDispatcherRuntime"] +["CDC:ErrDispatcherFailed"] error = ''' -dispatcher runtime error +dispatcher failed ''' ["CDC:ErrEncodeFailed"] diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index bb60ce4e6fa..22e1d613a10 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -124,9 +124,10 @@ var ( "filter rule is invalid %v", errors.RFCCodeText("CDC:ErrFilterRuleInvalid"), ) - ErrDispatcherRuntime = errors.Normalize( - "dispatcher runtime error", - errors.RFCCodeText("CDC:ErrDispatcherRuntime"), + + ErrDispatcherFailed = errors.Normalize( + "dispatcher failed", + errors.RFCCodeText("CDC:ErrDispatcherFailed"), ) // internal errors diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 8e83cff82dc..9f5a3055bac 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -77,7 +77,7 @@ var changefeedUnRetryableErrors = []*errors.Error{ ErrSyncRenameTableFailed, ErrChangefeedUnretryable, ErrCorruptedDataMutation, - ErrDispatcherRuntime, + ErrDispatcherFailed, ErrSinkURIInvalid, ErrKafkaInvalidConfig, From dce3695b4503d5e4ef9499d38f368ff4e11d8d2d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Oct 2023 17:32:24 +0800 Subject: [PATCH 12/15] fix bugs. --- cdc/model/schema_storage_test.go | 34 +++++++++++++++++++++++++++++++- cdc/model/sink_test.go | 33 ------------------------------- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 67b7bb346f9..bf44634b3b7 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -48,7 +48,7 @@ func TestHandleKeyPriority(t *testing.T) { State: timodel.StatePublic, }, { - Name: timodel.CIStr{O: "d"}, + Name: timodel.CIStr{O: "d"}, FieldType: parser_types.FieldType{ // test not null unique index // Flag: mysql.NotNullFlag, @@ -270,3 +270,35 @@ func TestTableInfoClone(t *testing.T) { cloned.SchemaID = 100 require.Equal(t, int64(10), info.SchemaID) } + +func TestIndexByName(t *testing.T) { + + tableInfo := &TableInfo{ + TableInfo: &timodel.TableInfo{ + Indices: []*timodel.IndexInfo{ + { + Name: timodel.CIStr{ + O: "idx1", + }, + Columns: []*timodel.IndexColumn{ + { + Name: timodel.CIStr{ + O: "col1", + }, + }, + }, + }, + }, + }, + } + + names, offsets, ok := tableInfo.IndexByName("idx2") + require.False(t, ok) + require.Nil(t, names) + require.Nil(t, offsets) + + names, offsets, ok = tableInfo.IndexByName("idx1") + require.True(t, ok) + require.Equal(t, []string{"col1"}, names) + require.Equal(t, []int{0}, offsets) +} diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 334bf3aed58..eb899d6c109 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -609,36 +609,3 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { require.NoError(t, err) require.Len(t, txn.Rows, 1) } - -func TestIndexByName(t *testing.T) { - event := &RowChangedEvent{ - TableInfo: &TableInfo{ - TableInfo: &timodel.TableInfo{ - Indices: []*timodel.IndexInfo{ - { - Name: timodel.CIStr{ - O: "idx1", - }, - Columns: []*timodel.IndexColumn{ - { - Name: timodel.CIStr{ - O: "col1", - }, - }, - }, - }, - }, - }, - }, - } - - names, offsets, ok := event.IndexByName("idx2") - require.False(t, ok) - require.Nil(t, names) - require.Nil(t, offsets) - - names, offsets, ok = event.IndexByName("idx1") - require.True(t, ok) - require.Equal(t, []string{"col1"}, names) - require.Equal(t, []int{0}, offsets) -} From 4877d42ce715a3ed8a35a5bbc3d549a0638f7259 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Oct 2023 17:33:00 +0800 Subject: [PATCH 13/15] tiny adjust. --- cdc/model/schema_storage_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index bf44634b3b7..9d4791a4799 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -48,7 +48,7 @@ func TestHandleKeyPriority(t *testing.T) { State: timodel.StatePublic, }, { - Name: timodel.CIStr{O: "d"}, + Name: timodel.CIStr{O: "d"}, FieldType: parser_types.FieldType{ // test not null unique index // Flag: mysql.NotNullFlag, @@ -272,7 +272,6 @@ func TestTableInfoClone(t *testing.T) { } func TestIndexByName(t *testing.T) { - tableInfo := &TableInfo{ TableInfo: &timodel.TableInfo{ Indices: []*timodel.IndexInfo{ From 6438ad8700d45ae69eb0f59abe96db7ef939ea1c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 11 Oct 2023 11:01:35 +0800 Subject: [PATCH 14/15] update swagger. --- docs/swagger/docs.go | 5 ++++- docs/swagger/swagger.json | 5 ++++- docs/swagger/swagger.yaml | 4 +++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index c2f5e52d8fa..c1948fb22ec 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1428,7 +1428,7 @@ var doc = `{ "description": "Deprecated, please use PartitionRule.", "type": "string" }, - "index-name": { + "index": { "type": "string" }, "matcher": { @@ -2380,6 +2380,9 @@ var doc = `{ "v2.DispatchRule": { "type": "object", "properties": { + "index": { + "type": "string" + }, "matcher": { "type": "array", "items": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 009460ab198..3bf59aae04b 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1409,7 +1409,7 @@ "description": "Deprecated, please use PartitionRule.", "type": "string" }, - "index-name": { + "index": { "type": "string" }, "matcher": { @@ -2361,6 +2361,9 @@ "v2.DispatchRule": { "type": "object", "properties": { + "index": { + "type": "string" + }, "matcher": { "type": "array", "items": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 1c9ed74d5d0..3481eb99ca2 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -57,7 +57,7 @@ definitions: dispatcher: description: Deprecated, please use PartitionRule. type: string - index-name: + index: type: string matcher: items: @@ -734,6 +734,8 @@ definitions: type: object v2.DispatchRule: properties: + index: + type: string matcher: items: type: string From 4eccbe6b01d28cfb0bd4d37fff0ac69046873a98 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 11 Oct 2023 14:49:25 +0800 Subject: [PATCH 15/15] revert changes in the dml sink. --- cdc/sink/dmlsink/mq/mq_dml_sink.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index cf1cf59f2e0..b18b1017bcd 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -67,8 +67,6 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} - errCh chan error - scheme string } @@ -95,9 +93,7 @@ func newDMLSink( ctx: ctx, cancel: cancel, dead: make(chan struct{}), - errCh: errCh, - - scheme: scheme, + scheme: scheme, } s.alive.eventRouter = eventRouter s.alive.topicManager = topicManager @@ -115,21 +111,17 @@ func newDMLSink( s.alive.Unlock() close(s.dead) - s.handleError(err) + if err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + case errCh <- err: + } + } }() return s } -func (s *dmlSink) handleError(err error) { - if err != nil && errors.Cause(err) != context.Canceled { - select { - case <-s.ctx.Done(): - case s.errCh <- err: - } - } -} - // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTableTxn]) error {