Skip to content

Commit

Permalink
add column selector.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 27, 2023
1 parent 67919da commit c7ee81e
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 51 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/column_selector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -98,7 +98,7 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

trans, err := transformer.NewColumnSelector(replicaConfig)
trans, err := column_selector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
return errors.Trace(err)
}

err = s.alive.transformer.Transform(row)
err = s.alive.transformer.Apply(row)
if err != nil {
s.cancel(err)
return errors.Trace(err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Inc.
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package transformer
package column_selector

import (
filter "github.com/pingcap/tidb/util/table-filter"
Expand All @@ -20,35 +20,14 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

// Transformer is the interface for transform the event.
type Transformer interface {
Match(table *model.TableName) bool
Transform(event *model.RowChangedEvent) error
}

type ColumnSelectors []*columnSelector

func (c ColumnSelectors) Match(_ *model.TableName) bool {
return true
}

func (c ColumnSelectors) Transform(event *model.RowChangedEvent) error {
for _, selector := range c {
if selector.Match(event.Table) {
return selector.Transform(event)
}
}
return nil
}

type columnSelector struct {
type selector struct {
tableF filter.Filter
columnM filter.ColumnFilter
}

func newColumnSelector(
func newSelector(
rule *config.ColumnSelector, caseSensitive bool,
) (*columnSelector, error) {
) (*selector, error) {
tableM, err := filter.Parse(rule.Matcher)
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, rule.Matcher)
Expand All @@ -60,19 +39,20 @@ func newColumnSelector(
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, rule.Columns)
}
return &columnSelector{

return &selector{
tableF: tableM,
columnM: columnM,
}, nil
}

// Match implements Transformer interface
func (s *columnSelector) Match(table *model.TableName) bool {
func (s *selector) Match(table *model.TableName) bool {
return s.tableF.MatchTable(table.Schema, table.Table)
}

// Transform implements Transformer interface
func (s *columnSelector) Transform(event *model.RowChangedEvent) error {
// Apply implements Transformer interface
func (s *selector) Apply(event *model.RowChangedEvent) error {
// the event does not match the table filter, skip it
if !s.tableF.MatchTable(event.Table.Schema, event.Table.Table) {
return nil
Expand All @@ -92,16 +72,37 @@ func (s *columnSelector) Transform(event *model.RowChangedEvent) error {
return nil
}

// NewColumnSelector return a column selector
func NewColumnSelector(cfg *config.ReplicaConfig) (ColumnSelectors, error) {
result := make(ColumnSelectors, 0, len(cfg.Sink.ColumnSelectors))
// ColumnSelector manages an array of selectors, the first selector match the given
// event is used to select out columns.
type ColumnSelector struct {
selectors []*selector
}

// New return a column selector
func New(cfg *config.ReplicaConfig) (*ColumnSelector, error) {
selectors := make([]*selector, 0, len(cfg.Sink.ColumnSelectors))
for _, r := range cfg.Sink.ColumnSelectors {
selector, err := newColumnSelector(r, cfg.CaseSensitive)
selector, err := newSelector(r, cfg.CaseSensitive)
if err != nil {
return nil, err
}
result = append(result, selector)
selectors = append(selectors, selector)
}

return result, nil
return &ColumnSelector{
selectors: selectors,
}, nil
}

func (c *ColumnSelector) Match(_ *model.TableName) bool {
return true
}

func (c *ColumnSelector) Apply(event *model.RowChangedEvent) error {
for _, s := range c.selectors {
if s.Match(event.Table) {
return s.Apply(event)
}
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Inc.
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package transformer
package column_selector

import (
"testing"
Expand Down Expand Up @@ -59,14 +59,14 @@ var (
)

func TestNewColumnSelectorNoRules(t *testing.T) {
// the column selector rule is not set
// the column selector is not set
replicaConfig := config.GetDefaultReplicaConfig()
selectors, err := NewColumnSelector(replicaConfig)
selectors, err := New(replicaConfig)
require.NoError(t, err)
require.NotNil(t, selectors)
require.Len(t, selectors, 0)
require.Len(t, selectors.selectors, 0)

err = selectors.Transform(event)
err = selectors.Apply(event)
require.NoError(t, err)
for _, column := range event.Columns {
require.NotNil(t, column.Value)
Expand Down Expand Up @@ -96,12 +96,12 @@ func TestNewColumnSelector(t *testing.T) {
Columns: []string{"co?1"},
},
}
selectors, err := NewColumnSelector(replicaConfig)
selectors, err := New(replicaConfig)
require.NoError(t, err)
require.Len(t, selectors, 4)
require.Len(t, selectors.selectors, 4)

// column3 is filter out, set to nil.
err = selectors.Transform(event)
err = selectors.Apply(event)
require.NoError(t, err)
require.Equal(t, []byte("val1"), event.Columns[0].Value)
require.Equal(t, []byte("val2"), event.Columns[1].Value)
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestNewColumnSelector(t *testing.T) {
},
}
// the first column `a` is filter out, set to nil.
err = selectors.Transform(event)
err = selectors.Apply(event)
require.NoError(t, err)
require.Nil(t, event.Columns[0].Value)
require.Equal(t, []byte("b"), event.Columns[1].Value)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestNewColumnSelector(t *testing.T) {
},
},
}
err = selectors.Transform(event)
err = selectors.Apply(event)
require.NoError(t, err)
require.Equal(t, []byte("col"), event.Columns[0].Value)
require.Equal(t, []byte("col1"), event.Columns[1].Value)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestNewColumnSelector(t *testing.T) {
},
},
}
err = selectors.Transform(event)
err = selectors.Apply(event)
require.NoError(t, err)
require.Nil(t, event.Columns[0].Value)
require.Equal(t, []byte("col1"), event.Columns[1].Value)
Expand Down
21 changes: 21 additions & 0 deletions cdc/sink/dmlsink/mq/transformer/transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package transformer

import "github.com/pingcap/tiflow/cdc/model"

// Transformer is the interface for transform the event.
type Transformer interface {
Apply(event *model.RowChangedEvent) error
}

0 comments on commit c7ee81e

Please sign in to comment.