Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): split the updat events all at once #9688

Merged
merged 18 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var defaultAPIConfig = &ReplicaConfig{
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
EncoderConcurrency: util.AddressOf(16),
EncoderConcurrency: util.AddressOf(config.DefaultEncoderGroupConcurrency),
Terminator: util.AddressOf(config.CRLF),
DateSeparator: util.AddressOf(config.DateSeparatorDay.String()),
EnablePartitionSeparator: util.AddressOf(true),
Expand Down
25 changes: 21 additions & 4 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -271,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent() error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {
return nil
}

Expand Down Expand Up @@ -379,7 +380,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error {
return nil
}

Expand Down Expand Up @@ -767,8 +768,8 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
if !t.shouldSplitUpdateEvent(scheme) {
return nil
}
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
Expand All @@ -779,6 +780,22 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
return nil
}

3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
return true
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand Down
46 changes: 46 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -563,3 +564,48 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result))
}

var ukUpdatedEvent = &RowChangedEvent{
PreColumns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value",
},
},

Columns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value-updated",
},
},
}

func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
txn := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
}
5 changes: 1 addition & 4 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err = handleRowChangedEvents(w.changefeedID, task.span, e)
if err != nil {
return errors.Trace(err)
}
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}

Expand Down
6 changes: 1 addition & 5 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err := handleRowChangedEvents(w.changefeedID, task.span, e)
if err != nil {
return err
}

x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
allEventSize += size
}
Expand Down
59 changes: 4 additions & 55 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6
func handleRowChangedEvents(
changefeed model.ChangeFeedID, span tablepb.Span,
events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64, error) {
) ([]*model.RowChangedEvent, uint64) {
size := 0
rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events))
for _, e := range events {
Expand All @@ -455,7 +455,6 @@ func handleRowChangedEvents(
}

rowEvent := e.Row

// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
Expand All @@ -468,60 +467,10 @@ func handleRowChangedEvents(
continue
}

if !(rowEvent.IsUpdate() && shouldSplitUpdateEvent(rowEvent)) {
size += e.Row.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, e.Row)
continue
}

deleteEvent, insertEvent, err := splitUpdateEvent(rowEvent)
if err != nil {
return nil, 0, errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
size += deleteEvent.ApproximateBytes()
size += insertEvent.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent)
}
return rowChangedEvents, uint64(size), nil
}

// shouldSplitUpdateEvent return true if the unique key column is modified.
func shouldSplitUpdateEvent(updateEvent *model.RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

for i := range updateEvent.Columns {
col := updateEvent.Columns[i]
preCol := updateEvent.PreColumns[i]
if col != nil && (col.Flag.IsUniqueKey() || col.Flag.IsHandleKey()) &&
preCol != nil && (preCol.Flag.IsUniqueKey() || preCol.Flag.IsHandleKey()) {
colValueString := model.ColumnValueString(col.Value)
preColValueString := model.ColumnValueString(preCol.Value)
if colValueString != preColValueString {
return true
}
}
}

return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(updateEvent *model.RowChangedEvent) (*model.RowChangedEvent, *model.RowChangedEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
size += rowEvent.ApproximateBytes()
rowChangedEvents = append(rowChangedEvents, rowEvent)
}

deleteEvent := *updateEvent
deleteEvent.Columns = nil

// set the `PreColumns` to nil to make the update into an insert.
updateEvent.PreColumns = nil

return &deleteEvent, updateEvent, nil
return rowChangedEvents, uint64(size)
}

func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
Expand Down
75 changes: 11 additions & 64 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand All @@ -50,6 +51,10 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh
return nil
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
}

func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -153,8 +158,7 @@ func TestHandleNilRowChangedEvents(t *testing.T) {
events := []*model.PolymorphicEvent{nil}
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)
result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, span, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}
Expand All @@ -176,81 +180,25 @@ func TestHandleEmptyRowChangedEvents(t *testing.T) {
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)

result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, span, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}

func TestHandleRowChangedEventsUniqueKeyColumnUpdated(t *testing.T) {
t.Parallel()

columns := []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "col2-value-updated",
},
}
preColumns := []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "col2-value",
},
}

// the handle key updated, should be split into 2 events
events := []*model.PolymorphicEvent{
{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Table: &model.TableName{
Schema: "test",
Table: "test",
},
},
},
}
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)
result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.Equal(t, uint64(448), size)

require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())
}

func TestHandleRowChangedEventsNonUniqueKeyColumnUpdated(t *testing.T) {
func TestHandleRowChangedEventNormalEvent(t *testing.T) {
t.Parallel()

// Update non-unique key.
columns := []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
Value: "col1-value",
},
{
Name: "col2",
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "col2-value",
Value: "col2-value-updated",
},
}
preColumns := []*model.Column{
Expand Down Expand Up @@ -283,8 +231,7 @@ func TestHandleRowChangedEventsNonUniqueKeyColumnUpdated(t *testing.T) {
}
changefeedID := model.DefaultChangeFeedID("1")
span := spanz.TableIDToComparableSpan(1)
result, size, err := handleRowChangedEvents(changefeedID, span, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, span, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(224), size)
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package factory
import (
"context"
"net/url"
"strings"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -46,7 +45,7 @@ func New(
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := strings.ToLower(sinkURI.Scheme)
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/zap"
)

Expand All @@ -42,6 +43,11 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang
return nil
}

// Scheme return the scheme of the sink.
func (s *DMLSink) Scheme() string {
return sink.BlackHoleScheme
}

// Close do nothing.
func (s *DMLSink) Close() {}

Expand Down
Loading
Loading