Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(ticdc): index-value dispatcher support set index names. #9825

Merged
Merged
51 changes: 41 additions & 10 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/validator"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/version"
"github.com/r3labs/diff"
Expand Down Expand Up @@ -112,9 +114,11 @@ type APIV2Helpers interface {
credential *security.Credential,
) (tidbkv.Storage, error)

// getVerfiedTables wraps entry.VerifyTables to increase testability
getVerfiedTables(replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64) (ineligibleTables,
// getVerifiedTables wraps entry.VerifyTables to increase testability
getVerifiedTables(replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64,
scheme string, topic string, protocol config.Protocol,
) (ineligibleTables,
eligibleTables []model.TableName, err error,
)
}
Expand Down Expand Up @@ -491,15 +495,42 @@ func (h APIV2HelpersImpl) createTiStore(pdAddrs []string,
return kv.CreateTiStore(strings.Join(pdAddrs, ","), credential)
}

func (h APIV2HelpersImpl) getVerfiedTables(replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64) (ineligibleTables,
eligibleTables []model.TableName, err error,
) {
func (h APIV2HelpersImpl) getVerifiedTables(
replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64,
scheme string, topic string, protocol config.Protocol,
) ([]model.TableName, []model.TableName, error) {
f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
return
return nil, nil, err
}
_, ineligibleTables, eligibleTables, err = entry.
tableInfos, ineligibleTables, eligibleTables, err := entry.
VerifyTables(f, storage, startTs)
return
if err != nil {
return nil, nil, err
}

if !sink.IsMQScheme(scheme) {
return ineligibleTables, eligibleTables, nil
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, protocol, topic, scheme)
if err != nil {
return nil, nil, err
}

for _, table := range tableInfos {
dummyEvent := &model.RowChangedEvent{
Table: &model.TableName{
Schema: table.TableName.Schema,
Table: table.TableName.Table,
},
TableInfo: table,
}
_, _, err := eventRouter.GetPartitionForRowChange(dummyEvent, 1)
if err != nil {
return nil, nil, err
}
}
return ineligibleTables, eligibleTables, nil
}
12 changes: 6 additions & 6 deletions cdc/api/v2/api_helpers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"sort"
"strings"
"time"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
Expand Down Expand Up @@ -341,9 +343,20 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
_ = c.Error(err)
return
}
uri, err := url.Parse(cfg.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
scheme := uri.Scheme
topic := strings.TrimFunc(uri.Path, func(r rune) bool {
return r == '/'
})
replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig()
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol))

ineligibleTables, eligibleTables, err := h.helpers.
getVerfiedTables(replicaCfg, kvStore, cfg.StartTs)
getVerifiedTables(replicaCfg, kvStore, cfg.StartTs, scheme, topic, protocol)
if err != nil {
_ = c.Error(err)
return
Expand Down
11 changes: 7 additions & 4 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func TestCreateChangefeed(t *testing.T) {
helpers.EXPECT().
getEtcdClient(gomock.Any(), gomock.Any()).
Return(testEtcdCluster.RandClient(), nil)
helpers.EXPECT().getVerfiedTables(gomock.Any(), gomock.Any(), gomock.Any()).
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil, nil).
AnyTimes()
helpers.EXPECT().
Expand Down Expand Up @@ -662,12 +663,13 @@ func TestVerifyTable(t *testing.T) {
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrNewStore")

// case 3: getVerfiedTables failed
// case 3: getVerifiedTables failed
helpers.EXPECT().
createTiStore(gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
helpers.EXPECT().getVerfiedTables(gomock.Any(), gomock.Any(), gomock.Any()).
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil, cerrors.ErrFilterRuleInvalid).
Times(1)

Expand All @@ -688,7 +690,8 @@ func TestVerifyTable(t *testing.T) {
ineligible := []model.TableName{
{Schema: "test", Table: "invalidTable"},
}
helpers.EXPECT().getVerfiedTables(gomock.Any(), gomock.Any(), gomock.Any()).
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(eligible, ineligible, nil)

w = httptest.NewRecorder()
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type VerifyTableConfig struct {
PDConfig
ReplicaConfig *ReplicaConfig `json:"replica_config"`
StartTs uint64 `json:"start_ts"`
SinkURI string `json:"sink_uri"`
}

func getDefaultVerifyTableConfig() *VerifyTableConfig {
Expand Down
16 changes: 16 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,22 @@ func (r *RowChangedEvent) ApproximateBytes() int {
return size
}

// IndexByName returns the index columns and offsets of the corresponding index by name
func (r *RowChangedEvent) IndexByName(name string) ([]string, []int, bool) {
for _, index := range r.TableInfo.Indices {
if index.Name.O == name {
names := make([]string, 0, len(index.Columns))
offset := make([]int, 0, len(index.Columns))
for _, col := range index.Columns {
names = append(names, col.Name.O)
offset = append(offset, col.Offset)
}
return names, offset, true
}
}
return nil, nil, false
}

// Column represents a column value in row changed event
type Column struct {
Name string `json:"name" msg:"name"`
Expand Down
33 changes: 33 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,36 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
}

func TestIndexByName(t *testing.T) {
event := &RowChangedEvent{
TableInfo: &TableInfo{
TableInfo: &timodel.TableInfo{
Indices: []*timodel.IndexInfo{
{
Name: timodel.CIStr{
O: "idx1",
},
Columns: []*timodel.IndexColumn{
{
Name: timodel.CIStr{
O: "col1",
},
},
},
},
},
},
},
}

names, offsets, ok := event.IndexByName("idx2")
require.False(t, ok)
require.Nil(t, names)
require.Nil(t, offsets)

names, offsets, ok = event.IndexByName("idx1")
require.True(t, ok)
require.Equal(t, []string{"col1"}, names)
require.Equal(t, []int{0}, offsets)
}
8 changes: 4 additions & 4 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewEventRouter(
f = filter.CaseInsensitive(f)
}

d := getPartitionDispatcher(ruleConfig.PartitionRule, scheme)
d := getPartitionDispatcher(ruleConfig.PartitionRule, scheme, ruleConfig.IndexName)
t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol, scheme)
if err != nil {
return nil, err
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *EventRouter) GetTopicForDDL(ddl *model.DDLEvent) string {
func (s *EventRouter) GetPartitionForRowChange(
row *model.RowChangedEvent,
partitionNum int32,
) (int32, string) {
) (int32, string, error) {
_, partitionDispatcher := s.matchDispatcher(
row.Table.Schema, row.Table.Table,
)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *EventRouter) matchDispatcher(
}

// getPartitionDispatcher returns the partition dispatcher for a specific partition rule.
func getPartitionDispatcher(rule string, scheme string) partition.Dispatcher {
func getPartitionDispatcher(rule string, scheme string, indexName string) partition.Dispatcher {
switch strings.ToLower(rule) {
case "default":
return partition.NewDefaultDispatcher()
Expand All @@ -186,7 +186,7 @@ func getPartitionDispatcher(rule string, scheme string) partition.Dispatcher {
return partition.NewTableDispatcher()
case "index-value", "rowid":
log.Warn("rowid is deprecated, please use index-value instead.")
return partition.NewIndexValueDispatcher()
return partition.NewIndexValueDispatcher(indexName)
default:
}

Expand Down
15 changes: 10 additions & 5 deletions cdc/sink/dmlsink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
d, err := NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "test", sink.KafkaScheme)
require.NoError(t, err)

p, _ := d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _, err := d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default1", Table: "table"},
Columns: []*model.Column{
{
Expand All @@ -203,9 +203,10 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
IndexColumns: [][]int{{0}},
}, 16)
require.NoError(t, err)
require.Equal(t, int32(14), p)

p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default2", Table: "table"},
Columns: []*model.Column{
{
Expand All @@ -216,15 +217,17 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
IndexColumns: [][]int{{0}},
}, 16)
require.NoError(t, err)
require.Equal(t, int32(0), p)

p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_table", Table: "table"},
CommitTs: 1,
}, 16)
require.NoError(t, err)
require.Equal(t, int32(15), p)

p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_index_value", Table: "table"},
Columns: []*model.Column{
{
Expand All @@ -238,12 +241,14 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
},
}, 10)
require.NoError(t, err)
require.Equal(t, int32(1), p)

p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _, err = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "a", Table: "table"},
CommitTs: 1,
}, 2)
require.NoError(t, err)
require.Equal(t, int32(1), p)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ func NewDefaultDispatcher() *DefaultDispatcher {

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (d *DefaultDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) {
func (d *DefaultDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) {
return d.tbd.DispatchRowChangedEvent(row, partitionNum)
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestDefaultDispatcher(t *testing.T) {
IndexColumns: [][]int{{0}},
}

targetPartition, _ := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3)
targetPartition, _, err := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3)
require.NoError(t, err)
require.Equal(t, int32(0), targetPartition)
}
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ type Dispatcher interface {
// DispatchRowChangedEvent returns an index of partitions or a partition key
// according to RowChangedEvent.
// Concurrency Note: This method is thread-safe.
DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string)
DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error)
}
Loading
Loading