Skip to content

Commit

Permalink
sink(ticdc): split the updat events all at once (#9688)
Browse files Browse the repository at this point in the history
close #9689
  • Loading branch information
3AceShowHand authored Sep 14, 2023
1 parent 3fc52b4 commit a2819ad
Show file tree
Hide file tree
Showing 26 changed files with 167 additions and 162 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var defaultAPIConfig = &ReplicaConfig{
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
EncoderConcurrency: util.AddressOf(16),
EncoderConcurrency: util.AddressOf(config.DefaultEncoderGroupConcurrency),
Terminator: util.AddressOf(config.CRLF),
DateSeparator: util.AddressOf(config.DateSeparatorDay.String()),
EnablePartitionSeparator: util.AddressOf(true),
Expand Down
25 changes: 21 additions & 4 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -271,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent() error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {
return nil
}

Expand Down Expand Up @@ -379,7 +380,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error {
return nil
}

Expand Down Expand Up @@ -767,8 +768,8 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
if !t.shouldSplitUpdateEvent(scheme) {
return nil
}
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
Expand All @@ -779,6 +780,22 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
return nil
}

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
return true
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand Down
46 changes: 46 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -563,3 +564,48 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result))
}

var ukUpdatedEvent = &RowChangedEvent{
PreColumns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value",
},
},

Columns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value-updated",
},
},
}

func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
txn := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
}
5 changes: 1 addition & 4 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err = handleRowChangedEvents(w.changefeedID, task.span, e)
if err != nil {
return errors.Trace(err)
}
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}

Expand Down
6 changes: 1 addition & 5 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err := handleRowChangedEvents(w.changefeedID, task.span, e)
if err != nil {
return err
}

x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
allEventSize += size
}
Expand Down
59 changes: 4 additions & 55 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6
func handleRowChangedEvents(
changefeed model.ChangeFeedID, span tablepb.Span,
events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64, error) {
) ([]*model.RowChangedEvent, uint64) {
size := 0
rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events))
for _, e := range events {
Expand All @@ -455,7 +455,6 @@ func handleRowChangedEvents(
}

rowEvent := e.Row

// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
Expand All @@ -468,60 +467,10 @@ func handleRowChangedEvents(
continue
}

if !(rowEvent.IsUpdate() && shouldSplitUpdateEvent(rowEvent)) {
size += e.Row.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, e.Row)
continue
}

deleteEvent, insertEvent, err := splitUpdateEvent(rowEvent)
if err != nil {
return nil, 0, errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
size += deleteEvent.ApproximateBytes()
size += insertEvent.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent)
}
return rowChangedEvents, uint64(size), nil
}

// shouldSplitUpdateEvent return true if the unique key column is modified.
func shouldSplitUpdateEvent(updateEvent *model.RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

for i := range updateEvent.Columns {
col := updateEvent.Columns[i]
preCol := updateEvent.PreColumns[i]
if col != nil && (col.Flag.IsUniqueKey() || col.Flag.IsHandleKey()) &&
preCol != nil && (preCol.Flag.IsUniqueKey() || preCol.Flag.IsHandleKey()) {
colValueString := model.ColumnValueString(col.Value)
preColValueString := model.ColumnValueString(preCol.Value)
if colValueString != preColValueString {
return true
}
}
}

return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(updateEvent *model.RowChangedEvent) (*model.RowChangedEvent, *model.RowChangedEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
size += rowEvent.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, rowEvent)
}

deleteEvent := *updateEvent
deleteEvent.Columns = nil

// set the `PreColumns` to nil to make the update into an insert.
updateEvent.PreColumns = nil

return &deleteEvent, updateEvent, nil
return rowChangedEvents, uint64(size)
}

func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
Expand Down
75 changes: 11 additions & 64 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand All @@ -50,6 +51,10 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh
return nil
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
}

func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -153,8 +158,7 @@ func TestHandleNilRowChangedEvents(t *testing.T) {
events := []*model.PolymorphicEvent{nil}
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)
result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, span, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}
Expand All @@ -176,81 +180,25 @@ func TestHandleEmptyRowChangedEvents(t *testing.T) {
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)

result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, span, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}

func TestHandleRowChangedEventsUniqueKeyColumnUpdated(t *testing.T) {
t.Parallel()

columns := []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "col2-value-updated",
},
}
preColumns := []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "col2-value",
},
}

// the handle key updated, should be split into 2 events
events := []*model.PolymorphicEvent{
{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Table: &model.TableName{
Schema: "test",
Table: "test",
},
},
},
}
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)
result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.Equal(t, uint64(448), size)

require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())
}

func TestHandleRowChangedEventsNonUniqueKeyColumnUpdated(t *testing.T) {
func TestHandleRowChangedEventNormalEvent(t *testing.T) {
t.Parallel()

// Update non-unique key.
columns := []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
Value: "col1-value",
},
{
Name: "col2",
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "col2-value",
Value: "col2-value-updated",
},
}
preColumns := []*model.Column{
Expand Down Expand Up @@ -283,8 +231,7 @@ func TestHandleRowChangedEventsNonUniqueKeyColumnUpdated(t *testing.T) {
}
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)
result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, span, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(224), size)
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package factory
import (
"context"
"net/url"
"strings"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -46,7 +45,7 @@ func New(
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := strings.ToLower(sinkURI.Scheme)
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/zap"
)

Expand All @@ -42,6 +43,11 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang
return nil
}

// Scheme return the scheme of the sink.
func (s *DMLSink) Scheme() string {
return sink.BlackHoleScheme
}

// Close do nothing.
func (s *DMLSink) Close() {}

Expand Down
Loading

0 comments on commit a2819ad

Please sign in to comment.