diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 0880ddc6749..af9887b0240 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -275,6 +275,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( DispatcherRule: "", PartitionRule: rule.PartitionRule, IndexName: rule.IndexName, + Columns: rule.Columns, TopicRule: rule.TopicRule, }) } @@ -554,6 +555,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Matcher: rule.Matcher, PartitionRule: rule.PartitionRule, IndexName: rule.IndexName, + Columns: rule.Columns, TopicRule: rule.TopicRule, }) } @@ -915,9 +917,10 @@ type LargeMessageHandleConfig struct { // This is a duplicate of config.DispatchRule type DispatchRule struct { Matcher []string `json:"matcher,omitempty"` - PartitionRule string `json:"partition"` - IndexName string `json:"index"` - TopicRule string `json:"topic"` + PartitionRule string `json:"partition,omitempty"` + IndexName string `json:"index,omitempty"` + Columns []string `json:"columns,omitempty"` + TopicRule string `json:"topic,omitempty"` } // ColumnSelector represents a column selector for a table. diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index c9a2b21da39..26d904bba0c 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -316,18 +316,50 @@ func (ti *TableInfo) Clone() *TableInfo { return WrapTableInfo(ti.SchemaID, ti.TableName.Schema, ti.Version, ti.TableInfo.Clone()) } +// GetIndex return the corresponding index by the given name. +func (ti *TableInfo) GetIndex(name string) *model.IndexInfo { + for _, index := range ti.Indices { + if index != nil && index.Name.O == name { + return index + } + } + return nil +} + // IndexByName returns the index columns and offsets of the corresponding index by name func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) { - for _, index := range ti.Indices { - if index.Name.O == name { - names := make([]string, 0, len(index.Columns)) - offset := make([]int, 0, len(index.Columns)) - for _, col := range index.Columns { - names = append(names, col.Name.O) - offset = append(offset, col.Offset) - } - return names, offset, true + index := ti.GetIndex(name) + if index == nil { + return nil, nil, false + } + names := make([]string, 0, len(index.Columns)) + offset := make([]int, 0, len(index.Columns)) + for _, col := range index.Columns { + names = append(names, col.Name.O) + offset = append(offset, col.Offset) + } + return names, offset, true +} + +// ColumnsByNames returns the column offsets of the corresponding columns by names +// If any column does not exist, return false +func (ti *TableInfo) ColumnsByNames(names []string) ([]int, bool) { + // todo: optimize it + columnOffsets := make(map[string]int, len(ti.Columns)) + for _, col := range ti.Columns { + if col != nil { + columnOffsets[col.Name.O] = col.Offset } } - return nil, nil, false + + result := make([]int, 0, len(names)) + for _, col := range names { + offset, ok := columnOffsets[col] + if !ok { + return nil, false + } + result = append(result, offset) + } + + return result, true } diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 9d4791a4799..0da4e756153 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -273,6 +273,16 @@ func TestTableInfoClone(t *testing.T) { func TestIndexByName(t *testing.T) { tableInfo := &TableInfo{ + TableInfo: &timodel.TableInfo{ + Indices: nil, + }, + } + names, offsets, ok := tableInfo.IndexByName("idx1") + require.False(t, ok) + require.Nil(t, names) + require.Nil(t, offsets) + + tableInfo = &TableInfo{ TableInfo: &timodel.TableInfo{ Indices: []*timodel.IndexInfo{ { @@ -291,7 +301,7 @@ func TestIndexByName(t *testing.T) { }, } - names, offsets, ok := tableInfo.IndexByName("idx2") + names, offsets, ok = tableInfo.IndexByName("idx2") require.False(t, ok) require.Nil(t, names) require.Nil(t, offsets) @@ -301,3 +311,45 @@ func TestIndexByName(t *testing.T) { require.Equal(t, []string{"col1"}, names) require.Equal(t, []int{0}, offsets) } + +func TestColumnsByNames(t *testing.T) { + tableInfo := &TableInfo{ + TableInfo: &timodel.TableInfo{ + Columns: []*timodel.ColumnInfo{ + { + Name: timodel.CIStr{ + O: "col2", + }, + Offset: 1, + }, + { + Name: timodel.CIStr{ + O: "col1", + }, + Offset: 0, + }, + { + Name: timodel.CIStr{ + O: "col3", + }, + Offset: 2, + }, + }, + }, + } + + names := []string{"col1", "col2", "col3"} + offsets, ok := tableInfo.ColumnsByNames(names) + require.True(t, ok) + require.Equal(t, []int{0, 1, 2}, offsets) + + names = []string{"col2"} + offsets, ok = tableInfo.ColumnsByNames(names) + require.True(t, ok) + require.Equal(t, []int{1}, offsets) + + names = []string{"col1", "col-not-found"} + offsets, ok = tableInfo.ColumnsByNames(names) + require.False(t, ok) + require.Nil(t, offsets) +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index a11013cf7b5..53fc0c7fd57 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -67,7 +67,9 @@ func NewEventRouter( f = filter.CaseInsensitive(f) } - d := getPartitionDispatcher(ruleConfig.PartitionRule, scheme, ruleConfig.IndexName) + d := getPartitionDispatcher( + ruleConfig.PartitionRule, scheme, ruleConfig.IndexName, ruleConfig.Columns, + ) t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol, scheme) if err != nil { return nil, err @@ -130,12 +132,27 @@ func (s *EventRouter) GetPartitionForRowChange( func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error { for _, table := range infos { _, partitionDispatcher := s.matchDispatcher(table.TableName.Schema, table.TableName.Table) - if v, ok := partitionDispatcher.(*partition.IndexValueDispatcher); ok { - _, _, ok = table.IndexByName(v.IndexName) - if !ok { + switch v := partitionDispatcher.(type) { + case *partition.IndexValueDispatcher: + index := table.GetIndex(v.IndexName) + if index == nil { return cerror.ErrDispatcherFailed.GenWithStack( "index not found when verify the table, table: %v, index: %s", table.TableName, v.IndexName) } + // only allow the unique index to be set. + // For the non-unique index, if any column belongs to the index is updated, + // the event is not split, it may cause incorrect data consumption. + if !index.Unique { + return cerror.ErrDispatcherFailed.GenWithStack( + "index is not unique when verify the table, table: %v, index: %s", table.TableName, v.IndexName) + } + case *partition.ColumnsDispatcher: + _, ok := table.ColumnsByNames(v.Columns) + if !ok { + return cerror.ErrDispatcherFailed.GenWithStack( + "columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns) + } + default: } } return nil @@ -191,7 +208,9 @@ func (s *EventRouter) matchDispatcher( } // getPartitionDispatcher returns the partition dispatcher for a specific partition rule. -func getPartitionDispatcher(rule string, scheme string, indexName string) partition.Dispatcher { +func getPartitionDispatcher( + rule string, scheme string, indexName string, columns []string, +) partition.Dispatcher { switch strings.ToLower(rule) { case "default": return partition.NewDefaultDispatcher() @@ -204,6 +223,8 @@ func getPartitionDispatcher(rule string, scheme string, indexName string) partit case "rowid": log.Warn("rowid is deprecated, index-value is used as the partition dispatcher.") return partition.NewIndexValueDispatcher(indexName) + case "columns": + return partition.NewColumnsDispatcher(columns) default: } @@ -211,7 +232,7 @@ func getPartitionDispatcher(rule string, scheme string, indexName string) partit return partition.NewKeyDispatcher(rule) } - log.Warn("the partition dispatch rule is not default/ts/table/index-value," + + log.Warn("the partition dispatch rule is not default/ts/table/index-value/columns," + " use the default rule instead.") return partition.NewDefaultDispatcher() } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go b/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go new file mode 100644 index 00000000000..e4a14cfcf2f --- /dev/null +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go @@ -0,0 +1,73 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package partition + +import ( + "strconv" + "sync" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/hash" + "go.uber.org/zap" +) + +// ColumnsDispatcher is a partition dispatcher +// which dispatches events based on the given columns. +type ColumnsDispatcher struct { + hasher *hash.PositionInertia + lock sync.Mutex + + Columns []string +} + +// NewColumnsDispatcher creates a ColumnsDispatcher. +func NewColumnsDispatcher(columns []string) *ColumnsDispatcher { + return &ColumnsDispatcher{ + hasher: hash.NewPositionInertia(), + Columns: columns, + } +} + +// DispatchRowChangedEvent returns the target partition to which +// a row changed event should be dispatched. +func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) { + r.lock.Lock() + defer r.lock.Unlock() + r.hasher.Reset() + + r.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + + dispatchCols := row.Columns + if len(dispatchCols) == 0 { + dispatchCols = row.PreColumns + } + + offsets, ok := row.TableInfo.ColumnsByNames(r.Columns) + if !ok { + log.Error("columns not found when dispatch event", + zap.Any("tableName", row.Table), + zap.Strings("columns", r.Columns)) + return 0, "", errors.ErrDispatcherFailed.GenWithStack( + "columns not found when dispatch event, table: %v, columns: %v", row.Table, r.Columns) + } + + for idx := 0; idx < len(r.Columns); idx++ { + r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) + } + + sum32 := r.hasher.Sum32() + return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10), nil +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go new file mode 100644 index 00000000000..af9ed4939ae --- /dev/null +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go @@ -0,0 +1,81 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package partition + +import ( + "testing" + + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestColumnsDispatcher(t *testing.T) { + t.Parallel() + + event := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t1", + }, + TableInfo: &model.TableInfo{ + TableInfo: &timodel.TableInfo{ + Columns: []*timodel.ColumnInfo{ + { + Name: timodel.CIStr{ + O: "col2", + }, + Offset: 1, + }, + { + Name: timodel.CIStr{ + O: "col1", + }, + Offset: 0, + }, + { + Name: timodel.CIStr{ + O: "col3", + }, + Offset: 2, + }, + }, + }, + }, + Columns: []*model.Column{ + { + Name: "col1", + Value: 11, + }, + { + Name: "col2", + Value: 22, + }, + { + Name: "col3", + Value: 33, + }, + }, + } + + p := NewColumnsDispatcher([]string{"col-2", "col-not-found"}) + _, _, err := p.DispatchRowChangedEvent(event, 16) + require.ErrorIs(t, err, errors.ErrDispatcherFailed) + + p = NewColumnsDispatcher([]string{"col2", "col1"}) + index, _, err := p.DispatchRowChangedEvent(event, 16) + require.NoError(t, err) + require.Equal(t, int32(15), index) +} diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index c1948fb22ec..ca7a3347066 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1424,11 +1424,19 @@ var doc = `{ "config.DispatchRule": { "type": "object", "properties": { + "columns": { + "description": "Columns are set when using columns dispatcher.", + "type": "array", + "items": { + "type": "string" + } + }, "dispatcher": { "description": "Deprecated, please use PartitionRule.", "type": "string" }, "index": { + "description": "IndexName is set when using index-value dispatcher with specified index.", "type": "string" }, "matcher": { @@ -2380,6 +2388,12 @@ var doc = `{ "v2.DispatchRule": { "type": "object", "properties": { + "columns": { + "type": "array", + "items": { + "type": "string" + } + }, "index": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 3bf59aae04b..d748dd19560 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1405,11 +1405,19 @@ "config.DispatchRule": { "type": "object", "properties": { + "columns": { + "description": "Columns are set when using columns dispatcher.", + "type": "array", + "items": { + "type": "string" + } + }, "dispatcher": { "description": "Deprecated, please use PartitionRule.", "type": "string" }, "index": { + "description": "IndexName is set when using index-value dispatcher with specified index.", "type": "string" }, "matcher": { @@ -2361,6 +2369,12 @@ "v2.DispatchRule": { "type": "object", "properties": { + "columns": { + "type": "array", + "items": { + "type": "string" + } + }, "index": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 3481eb99ca2..ce21f335acb 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -54,10 +54,17 @@ definitions: type: object config.DispatchRule: properties: + columns: + description: Columns are set when using columns dispatcher. + items: + type: string + type: array dispatcher: description: Deprecated, please use PartitionRule. type: string index: + description: IndexName is set when using index-value dispatcher with specified + index. type: string matcher: items: @@ -734,6 +741,10 @@ definitions: type: object v2.DispatchRule: properties: + columns: + items: + type: string + type: array index: type: string matcher: diff --git a/pkg/config/sink.go b/pkg/config/sink.go index e78ad20a205..44d59590db5 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -292,7 +292,12 @@ type DispatchRule struct { // PartitionRule is an alias added for DispatcherRule to mitigate confusions. // In the future release, the DispatcherRule is expected to be removed . PartitionRule string `toml:"partition" json:"partition"` - IndexName string `toml:"index" json:"index"` + + // IndexName is set when using index-value dispatcher with specified index. + IndexName string `toml:"index" json:"index"` + + // Columns are set when using columns dispatcher. + Columns []string `toml:"columns" json:"columns"` TopicRule string `toml:"topic" json:"topic"` } diff --git a/tests/integration_tests/mq_sink_dispatcher/conf/changefeed.toml b/tests/integration_tests/mq_sink_dispatcher/conf/changefeed.toml new file mode 100644 index 00000000000..849681cee76 --- /dev/null +++ b/tests/integration_tests/mq_sink_dispatcher/conf/changefeed.toml @@ -0,0 +1,4 @@ +[sink] +dispatchers = [ + {matcher = ['dispatcher.index'], partition = "index-value", index = "idx_a"} +] diff --git a/tests/integration_tests/mq_sink_dispatcher/conf/diff_config.toml b/tests/integration_tests/mq_sink_dispatcher/conf/diff_config.toml new file mode 100644 index 00000000000..d3cb63bac5d --- /dev/null +++ b/tests/integration_tests/mq_sink_dispatcher/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/dispatcher/output" + +source-instances = ["tidb0"] + +target-instance = "mysql1" + +target-check-tables = ["dispatcher.?*"] + +[data-sources] +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/mq_sink_dispatcher/conf/new_changefeed.toml b/tests/integration_tests/mq_sink_dispatcher/conf/new_changefeed.toml new file mode 100644 index 00000000000..4fbc029095c --- /dev/null +++ b/tests/integration_tests/mq_sink_dispatcher/conf/new_changefeed.toml @@ -0,0 +1,4 @@ +[sink] +dispatchers = [ + {matcher = ['dispatcher.index'], partition = "index-value", index = ""} +] diff --git a/tests/integration_tests/mq_sink_dispatcher/run.sh b/tests/integration_tests/mq_sink_dispatcher/run.sh new file mode 100644 index 00000000000..ae28b63c464 --- /dev/null +++ b/tests/integration_tests/mq_sink_dispatcher/run.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +set -e + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 +MAX_RETRIES=10 + +# use kafka-consumer with canal-json decoder to sync data from kafka to mysql +function run() { + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + TOPIC_NAME="dispatcher-test" + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + + changefeed_id="test" + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c ${changefeed_id} --config="$CUR/conf/changefeed.toml" + + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" "" + + run_sql "DROP DATABASE if exists dispatcher;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE DATABASE dispatcher;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE TABLE dispatcher.index (a int primary key, b int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_sql "INSERT INTO dispatcher.index values (1, 2);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "failed" "ErrDispatcherFailed" + + run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="$SINK_URI" --config="$CUR/conf/new_changefeed.toml" --no-confirm + + run_cdc_cli changefeed resume -c ${changefeed_id} + + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" "" + + cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/new_changefeed.toml" 2>&1 & + + run_sql "INSERT INTO dispatcher.index values (2, 3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO dispatcher.index values (3, 4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO dispatcher.index values (4, 5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "UPDATE dispatcher.index set b = 5 where a = 1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "UPDATE dispatcher.index set b = 6 where a = 2;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "DELETE FROM dispatcher.index where a = 3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_sql "CREATE TABLE test.finish_mark (a int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 200 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 21fb8d504ab..0364ef6831f 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -14,7 +14,7 @@ mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_sui mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" -kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback" +kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback mq_sink_dispatcher" kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2"