Skip to content

Commit

Permalink
add more about column selector.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 30, 2023
1 parent df4162d commit 9898744
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 24 deletions.
10 changes: 10 additions & 0 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/validator"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -514,6 +515,15 @@ func (h APIV2HelpersImpl) getVerifiedTables(
return ineligibleTables, eligibleTables, nil
}

selectors, err := columnselector.New(replicaConfig)
if err != nil {
return nil, nil, err
}
err = selectors.VerifyTables(tableInfos)
if err != nil {
return nil, nil, err
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, protocol, topic, scheme)
if err != nil {
return nil, nil, err
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
}

for idx := 0; idx < len(r.Columns); idx++ {
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
col := dispatchCols[offsets[idx]]
if col == nil {
continue
}
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(col.Value)))
}

sum32 := r.hasher.Sum32()
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven
"index not found when dispatch event, table: %v, index: %s", row.Table, r.IndexName)
}
for idx := 0; idx < len(names); idx++ {
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
col := dispatchCols[offsets[idx]]
if col == nil {
continue
}
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(col.Value)))
}
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/column_selector"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -98,7 +98,7 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

trans, err := column_selector.New(replicaConfig)
trans, err := columnselector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/column_selector"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -105,7 +105,7 @@ func NewPulsarDMLSink(
return nil, errors.Trace(err)
}

trans, err := column_selector.New(replicaConfig)
trans, err := columnselector.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package column_selector
package columnselector

import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"
)

type selector struct {
Expand All @@ -30,14 +30,14 @@ func newSelector(
) (*selector, error) {
tableM, err := filter.Parse(rule.Matcher)
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, rule.Matcher)
return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Matcher)
}
if !caseSensitive {
tableM = filter.CaseInsensitive(tableM)
}
columnM, err := filter.ParseColumnFilter(rule.Columns)
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, rule.Columns)
return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Columns)
}

return &selector{
Expand All @@ -58,16 +58,28 @@ func (s *selector) Apply(event *model.RowChangedEvent) error {
return nil
}

for i := 0; i < len(event.Columns); i++ {
if !s.columnM.MatchColumn(event.Columns[i].Name) {
event.Columns[i] = nil
for idx, column := range event.Columns {
if s.columnM.MatchColumn(column.Name) {
continue
}
if column.Flag.IsHandleKey() || column.Flag.IsUniqueKey() {
return errors.ErrColumnSelectorFailed.GenWithStack(
"primary key or unique key cannot be filtered out, table: %v, column: %s",
event.Table, column.Name)
}
event.Columns[idx] = nil
}

for i := 0; i < len(event.PreColumns); i++ {
if !s.columnM.MatchColumn(event.PreColumns[i].Name) {
event.PreColumns[i] = nil
for idx, column := range event.PreColumns {
if s.columnM.MatchColumn(column.Name) {
continue
}
if column.Flag.IsHandleKey() || column.Flag.IsUniqueKey() {
return errors.ErrColumnSelectorFailed.GenWithStack(
"primary key or unique key cannot be filtered out, table: %v, column: %s",
event.Table, column.Name)
}
event.PreColumns[idx] = nil
}

return nil
Expand Down Expand Up @@ -95,10 +107,7 @@ func New(cfg *config.ReplicaConfig) (*ColumnSelector, error) {
}, nil
}

func (c *ColumnSelector) Match(_ *model.TableName) bool {
return true
}

// Apply the column selector to the given event.
func (c *ColumnSelector) Apply(event *model.RowChangedEvent) error {
for _, s := range c.selectors {
if s.Match(event.Table) {
Expand All @@ -107,3 +116,38 @@ func (c *ColumnSelector) Apply(event *model.RowChangedEvent) error {
}
return nil
}

// VerifyTables return the error if any given table cannot satisfy the column selector constraints.
// 1. if the column is filter out, it must not be a part of handle key or the unique key.
func (c *ColumnSelector) VerifyTables(infos []*model.TableInfo) error {
if len(c.selectors) == 0 {
return nil
}

for _, table := range infos {
for _, s := range c.selectors {
if !s.Match(&table.TableName) {
continue
}
for columnID, flag := range table.ColumnsFlag {
columnInfo, ok := table.GetColumnInfo(columnID)
if !ok {
return errors.ErrColumnSelectorFailed.GenWithStack(
"column not found when verify the table for the column selector, table: %v, column: %s",
table.TableName, columnInfo.Name)
}

if s.columnM.MatchColumn(columnInfo.Name.O) {
continue
}
if flag.IsHandleKey() || flag.IsUniqueKey() {
return errors.ErrColumnSelectorFailed.GenWithStack(
"primary key or unique key cannot be filtered out, table: %v, column: %s",
table.TableName, columnInfo.Name)
}
}
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package column_selector
package columnselector

import (
"testing"
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ error = '''
Codec invalid config
'''

["CDC:ErrColumnSelectorFailed"]
error = '''
column selector failed
'''

["CDC:ErrCompressionFailed"]
error = '''
Compression failed
Expand Down
6 changes: 3 additions & 3 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ var (
errors.RFCCodeText("CDC:ErrDispatcherFailed"),
)

ErrTransformerFailed = errors.Normalize(
"transformer failed",
errors.RFCCodeText("CDC:ErrTransformFailed"),
ErrColumnSelectorFailed = errors.Normalize(
"column selector failed",
errors.RFCCodeText("CDC:ErrColumnSelectorFailed"),
)

// internal errors
Expand Down

0 comments on commit 9898744

Please sign in to comment.