Skip to content

Commit

Permalink
add more unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 1, 2023
1 parent 87986ce commit 700fe04
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 17 deletions.
9 changes: 4 additions & 5 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,20 @@ func (h APIV2HelpersImpl) getVerifiedTables(
return ineligibleTables, eligibleTables, nil
}

selectors, err := columnselector.New(replicaConfig)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, protocol, topic, scheme)
if err != nil {
return nil, nil, err
}
err = selectors.VerifyTables(tableInfos)
err = eventRouter.VerifyTables(tableInfos)
if err != nil {
return nil, nil, err
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, protocol, topic, scheme)
selectors, err := columnselector.New(replicaConfig)
if err != nil {
return nil, nil, err
}

err = eventRouter.VerifyTables(tableInfos)
err = selectors.VerifyTables(tableInfos, eventRouter)
if err != nil {
return nil, nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,14 @@ func (s *EventRouter) GetPartitionForRowChange(
row *model.RowChangedEvent,
partitionNum int32,
) (int32, string, error) {
_, partitionDispatcher := s.matchDispatcher(
row.Table.Schema, row.Table.Table,
)
return s.GetPartitionDispatcher(row.Table.Schema, row.Table.Table).
DispatchRowChangedEvent(row, partitionNum)
}

return partitionDispatcher.DispatchRowChangedEvent(
row, partitionNum,
)
// GetPartitionDispatcher returns the partition dispatcher for a specific table.
func (s *EventRouter) GetPartitionDispatcher(schema, table string) partition.Dispatcher {
_, partitionDispatcher := s.matchDispatcher(schema, table)
return partitionDispatcher
}

// VerifyTables return error if any one table route rule is invalid.
Expand Down
29 changes: 24 additions & 5 deletions cdc/sink/dmlsink/mq/transformer/columnselector/column_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package columnselector
import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/partition"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
)
Expand Down Expand Up @@ -52,8 +54,8 @@ func (s *selector) Match(schema, table string) bool {
}

// Apply implements Transformer interface
// return error if the given event cannot match the selector, it's the caller's
// responsibility make sure the given event match the selector first before apply it.
// return error if the given event cannot match the selector, or the column cannot be filtered out.
// the caller's should make sure the given event match the selector first before apply it.
func (s *selector) Apply(event *model.RowChangedEvent) error {
// defensive check, this should not happen.
if !s.Match(event.Table.Schema, event.Table.Table) {
Expand Down Expand Up @@ -122,7 +124,10 @@ func (c *ColumnSelector) Apply(event *model.RowChangedEvent) error {

// VerifyTables return the error if any given table cannot satisfy the column selector constraints.
// 1. if the column is filter out, it must not be a part of handle key or the unique key.
func (c *ColumnSelector) VerifyTables(infos []*model.TableInfo) error {
// 2. if the filtered out column is used in the column dispatcher, return error.
func (c *ColumnSelector) VerifyTables(
infos []*model.TableInfo, eventRouter *dispatcher.EventRouter,
) error {
if len(c.selectors) == 0 {
return nil
}
Expand All @@ -143,20 +148,34 @@ func (c *ColumnSelector) VerifyTables(infos []*model.TableInfo) error {
if s.columnM.MatchColumn(columnInfo.Name.O) {
continue
}
// the column is filter out.
if flag.IsHandleKey() || flag.IsUniqueKey() {
return errors.ErrColumnSelectorFailed.GenWithStack(
"primary key or unique key cannot be filtered out by the column selector, "+
"table: %v, column: %s", table.TableName, columnInfo.Name)
}

partitionDispatcher := eventRouter.GetPartitionDispatcher(table.TableName.Schema, table.TableName.Table)
switch v := partitionDispatcher.(type) {
case *partition.ColumnsDispatcher:
for _, col := range v.Columns {
if col == columnInfo.Name.O {
return errors.ErrColumnSelectorFailed.GenWithStack(
"the filtered out column is used in the column dispatcher, "+
"table: %v, column: %s", table.TableName, columnInfo.Name)
}
}
default:
}
}
}
}

return nil
}

// Match return true if the given `schema.table` column is matched.
func (c *ColumnSelector) Match(schema, table, column string) bool {
// VerifyColumn return true if the given `schema.table` column is matched.
func (c *ColumnSelector) VerifyColumn(schema, table, column string) bool {
for _, s := range c.selectors {
if !s.Match(schema, table) {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package columnselector
import (
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -198,3 +201,96 @@ func TestNewColumnSelector(t *testing.T) {
require.Nil(t, event.Columns[2])
require.Equal(t, []byte("coA1"), event.Columns[3].Value)
}

func TestVerifyTableColumnNotAllowFiltered(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"b"},
},
}
selector, err := New(replicaConfig)
require.NoError(t, err)

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, config.ProtocolDefault, "default", "default")
require.NoError(t, err)

info := &timodel.TableInfo{
Name: timodel.CIStr{O: "t1", L: "t1"},
Columns: []*timodel.ColumnInfo{
{
ID: 0,
Name: timodel.CIStr{O: "a", L: "a"},
Offset: 0,
},
{
ID: 1,
Name: timodel.CIStr{O: "b", L: "b"},
Offset: 1,
},
{
ID: 2,
Name: timodel.CIStr{O: "c", L: "c"},
Offset: 2,
},
},
}
table := model.WrapTableInfo(0, "test", 0, info)
table.ColumnsFlag[0] = model.HandleKeyFlag
infos := []*model.TableInfo{table}

// column `a` is handle key, but it is filter out, return error.
err = selector.VerifyTables(infos, eventRouter)
require.ErrorIs(t, err, cerror.ErrColumnSelectorFailed)
}

func TestVerifyTablesColumnFilteredInDispatcher(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"a", "b"},
},
}
replicaConfig.Sink.DispatchRules = []*config.DispatchRule{
{
Matcher: []string{"test.*"},
PartitionRule: "columns",
Columns: []string{"c"},
},
}

selectors, err := New(replicaConfig)
require.NoError(t, err)

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, config.ProtocolDefault, "default", "default")
require.NoError(t, err)

info := &timodel.TableInfo{
Name: timodel.CIStr{O: "t1", L: "t1"},
Columns: []*timodel.ColumnInfo{
{
ID: 0,
Name: timodel.CIStr{O: "a", L: "a"},
Offset: 0,
},
{
ID: 1,
Name: timodel.CIStr{O: "b", L: "b"},
Offset: 1,
},
{
ID: 2,
Name: timodel.CIStr{O: "c", L: "c"},
Offset: 2,
},
},
}

table := model.WrapTableInfo(0, "test", 0, info)
// column `c` is filter out, but it is used in the column dispatcher, return error.
infos := []*model.TableInfo{table}
err = selectors.VerifyTables(infos, eventRouter)
require.ErrorIs(t, err, cerror.ErrColumnSelectorFailed)
}
2 changes: 1 addition & 1 deletion tests/utils/checksum_checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func getColumns(tx *sql.Tx, schema, table string, selector *columnselector.Colum
if err := rows.Scan(&t.Field, &t.Type, &t.Null, &t.Key, &t.Default, &t.Extra); err != nil {
return result, errors.Trace(err)
}
if selector.Match(schema, table, t.Field) {
if selector.VerifyColumn(schema, table, t.Field) {
result = append(result, t.Field)
}
}
Expand Down

0 comments on commit 700fe04

Please sign in to comment.