From d70671b345dbfb06af1bbc8520e7db2fe07b826c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 6 Sep 2023 14:40:19 +0800 Subject: [PATCH 01/18] only split the event at the same place. --- cdc/model/sink.go | 3 - cdc/model/sink_test.go | 38 ++++++++++++ .../sinkmanager/table_sink_wrapper.go | 55 +---------------- .../sinkmanager/table_sink_wrapper_test.go | 61 +------------------ 4 files changed, 43 insertions(+), 114 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index fbbc30eb579..83781a57a62 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -768,9 +768,6 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { // TrySplitAndSortUpdateEvent split update events if unique key is updated func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error { - if len(t.Rows) < 2 { - return nil - } newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index e0a0aed9d14..33129c8c8f2 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -563,3 +563,41 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(result)) } + +func TestTrySplitAndSortUpdateEventOne(t *testing.T) { + txn := &SingleTableTxn{ + Rows: make([]*RowChangedEvent, 0, 1), + } + + txn.Rows = append(txn.Rows, &RowChangedEvent{ + PreColumns: []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: HandleKeyFlag | UniqueKeyFlag, + Value: "col2-value", + }, + }, + + Columns: []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: HandleKeyFlag | UniqueKeyFlag, + Value: "col2-value-updated", + }, + }, + }) + + err := txn.TrySplitAndSortUpdateEvent() + require.NoError(t, err) + require.Len(t, txn.Rows, 2) +} diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index d41c10c2928..b8667cf798b 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -455,7 +455,6 @@ func handleRowChangedEvents( } rowEvent := e.Row - // Some transactions could generate empty row change event, such as // begin; insert into t (id) values (1); delete from t where id=1; commit; // Just ignore these row changed events. @@ -468,62 +467,12 @@ func handleRowChangedEvents( continue } - if !(rowEvent.IsUpdate() && shouldSplitUpdateEvent(rowEvent)) { - size += e.Row.ApproximateBytes() - rowChangedEvents = append(rowChangedEvents, e.Row) - continue - } - - deleteEvent, insertEvent, err := splitUpdateEvent(rowEvent) - if err != nil { - return nil, 0, errors.Trace(err) - } - // NOTICE: Please do not change the order, the delete event always comes before the insert event. - size += deleteEvent.ApproximateBytes() - size += insertEvent.ApproximateBytes() - rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent) + size += rowEvent.ApproximateBytes() + rowChangedEvents = append(rowChangedEvents, rowEvent) } return rowChangedEvents, uint64(size), nil } -// shouldSplitUpdateEvent return true if the unique key column is modified. -func shouldSplitUpdateEvent(updateEvent *model.RowChangedEvent) bool { - // nil event will never be split. - if updateEvent == nil { - return false - } - - for i := range updateEvent.Columns { - col := updateEvent.Columns[i] - preCol := updateEvent.PreColumns[i] - if col != nil && (col.Flag.IsUniqueKey() || col.Flag.IsHandleKey()) && - preCol != nil && (preCol.Flag.IsUniqueKey() || preCol.Flag.IsHandleKey()) { - colValueString := model.ColumnValueString(col.Value) - preColValueString := model.ColumnValueString(preCol.Value) - if colValueString != preColValueString { - return true - } - } - } - - return false -} - -// splitUpdateEvent splits an update event into a delete and an insert event. -func splitUpdateEvent(updateEvent *model.RowChangedEvent) (*model.RowChangedEvent, *model.RowChangedEvent, error) { - if updateEvent == nil { - return nil, nil, errors.New("nil event cannot be split") - } - - deleteEvent := *updateEvent - deleteEvent.Columns = nil - - // set the `PreColumns` to nil to make the update into an insert. - updateEvent.PreColumns = nil - - return &deleteEvent, updateEvent, nil -} - func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { backoffBaseDelayInMs := int64(100) totalRetryDuration := 10 * time.Second diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index b10df73aad4..b47fa040bad 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -182,62 +182,7 @@ func TestHandleEmptyRowChangedEvents(t *testing.T) { require.Equal(t, uint64(0), size) } -func TestHandleRowChangedEventsUniqueKeyColumnUpdated(t *testing.T) { - t.Parallel() - - columns := []*model.Column{ - { - Name: "col1", - Flag: model.BinaryFlag, - Value: "col1-value-updated", - }, - { - Name: "col2", - Flag: model.HandleKeyFlag | model.UniqueKeyFlag, - Value: "col2-value-updated", - }, - } - preColumns := []*model.Column{ - { - Name: "col1", - Flag: model.BinaryFlag, - Value: "col1-value", - }, - { - Name: "col2", - Flag: model.HandleKeyFlag | model.UniqueKeyFlag, - Value: "col2-value", - }, - } - - // the handle key updated, should be split into 2 events - events := []*model.PolymorphicEvent{ - { - CRTs: 1, - RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, - Row: &model.RowChangedEvent{ - CommitTs: 1, - Columns: columns, - PreColumns: preColumns, - Table: &model.TableName{ - Schema: "test", - Table: "test", - }, - }, - }, - } - changefeedID := model.DefaultChangeFeedID("1") - span := spanz.TableIDToComparableSpan(1) - result, size, err := handleRowChangedEvents(changefeedID, span, events...) - require.NoError(t, err) - require.Equal(t, 2, len(result)) - require.Equal(t, uint64(448), size) - - require.True(t, result[0].IsDelete()) - require.True(t, result[1].IsInsert()) -} - -func TestHandleRowChangedEventsNonUniqueKeyColumnUpdated(t *testing.T) { +func TestHandleRowChangedEventNormalEvent(t *testing.T) { t.Parallel() // Update non-unique key. @@ -245,12 +190,12 @@ func TestHandleRowChangedEventsNonUniqueKeyColumnUpdated(t *testing.T) { { Name: "col1", Flag: model.BinaryFlag, - Value: "col1-value-updated", + Value: "col1-value", }, { Name: "col2", Flag: model.HandleKeyFlag | model.UniqueKeyFlag, - Value: "col2-value", + Value: "col2-value-updated", }, } preColumns := []*model.Column{ From 664efa20805bb3deaa87d11f79b9b5b7f4caf874 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 6 Sep 2023 16:32:20 +0800 Subject: [PATCH 02/18] fix unit test. --- cdc/processor/sinkmanager/redo_log_worker.go | 5 +---- cdc/processor/sinkmanager/table_sink_worker.go | 6 +----- cdc/processor/sinkmanager/table_sink_wrapper.go | 4 ++-- cdc/processor/sinkmanager/table_sink_wrapper_test.go | 9 +++------ 4 files changed, 7 insertions(+), 17 deletions(-) diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index f2f9fe1aca5..89f2c7d604d 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -142,10 +142,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e if e.Row != nil { // For all events, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size, err = handleRowChangedEvents(w.changefeedID, task.span, e) - if err != nil { - return errors.Trace(err) - } + x, size = handleRowChangedEvents(w.changefeedID, task.span, e) advancer.appendEvents(x, size) } diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 687583219c0..ac77e424a2c 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -250,11 +250,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size, err := handleRowChangedEvents(w.changefeedID, task.span, e) - if err != nil { - return err - } - + x, size := handleRowChangedEvents(w.changefeedID, task.span, e) advancer.appendEvents(x, size) allEventSize += size } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index b8667cf798b..f6b74d6ee68 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -441,7 +441,7 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6 func handleRowChangedEvents( changefeed model.ChangeFeedID, span tablepb.Span, events ...*model.PolymorphicEvent, -) ([]*model.RowChangedEvent, uint64, error) { +) ([]*model.RowChangedEvent, uint64) { size := 0 rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events)) for _, e := range events { @@ -470,7 +470,7 @@ func handleRowChangedEvents( size += rowEvent.ApproximateBytes() rowChangedEvents = append(rowChangedEvents, rowEvent) } - return rowChangedEvents, uint64(size), nil + return rowChangedEvents, uint64(size) } func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index b47fa040bad..43f0bd295c6 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -153,8 +153,7 @@ func TestHandleNilRowChangedEvents(t *testing.T) { events := []*model.PolymorphicEvent{nil} changefeedID := model.DefaultChangeFeedID("1") span := spanz.TableIDToComparableSpan(1) - result, size, err := handleRowChangedEvents(changefeedID, span, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, span, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -176,8 +175,7 @@ func TestHandleEmptyRowChangedEvents(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") span := spanz.TableIDToComparableSpan(1) - result, size, err := handleRowChangedEvents(changefeedID, span, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, span, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -228,8 +226,7 @@ func TestHandleRowChangedEventNormalEvent(t *testing.T) { } changefeedID := model.DefaultChangeFeedID("1") span := spanz.TableIDToComparableSpan(1) - result, size, err := handleRowChangedEvents(changefeedID, span, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, span, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(224), size) } From e4dc5b2ae5632bf4b8c41a4324319b9ced7ae97d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 8 Sep 2023 15:21:34 +0800 Subject: [PATCH 03/18] do not split the update if the message count less than 2 --- cdc/model/sink.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 83781a57a62..fbbc30eb579 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -768,6 +768,9 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { // TrySplitAndSortUpdateEvent split update events if unique key is updated func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error { + if len(t.Rows) < 2 { + return nil + } newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) From 93f8e483d7162696e4ed510140c49338743425d0 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 16:02:03 +0800 Subject: [PATCH 04/18] split single row transaction by the sink type. --- cdc/model/sink.go | 9 +++++---- cdc/model/sink_test.go | 7 ++++++- .../sinkmanager/table_sink_wrapper_test.go | 5 +++++ cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go | 6 ++++++ .../dmlsink/cloudstorage/cloud_storage_dml_sink.go | 7 +++++++ cdc/sink/dmlsink/event.go | 2 +- cdc/sink/dmlsink/event_sink.go | 4 ++++ cdc/sink/dmlsink/mq/mq_dml_sink.go | 5 +++++ cdc/sink/dmlsink/txn/txn_dml_sink.go | 8 ++++++-- cdc/sink/tablesink/table_sink_impl.go | 13 ++++++++++++- cdc/sink/tablesink/table_sink_impl_test.go | 5 +++++ 11 files changed, 62 insertions(+), 9 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index fbbc30eb579..c8714559907 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -271,7 +271,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent() error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(_ bool) error { return nil } @@ -379,7 +379,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ bool) error { return nil } @@ -767,8 +767,9 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error { - if len(t.Rows) < 2 { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(splitSingleUpdateEvent bool) error { + // the txn only have one row, and no need to split the update event, just return + if len(t.Rows) < 2 && !splitSingleUpdateEvent { return nil } newRows, err := trySplitAndSortUpdateEvent(t.Rows) diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 33129c8c8f2..ea7e59e81cd 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -597,7 +597,12 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { }, }) - err := txn.TrySplitAndSortUpdateEvent() + // assume it's Kafka or storage sink. + err := txn.TrySplitAndSortUpdateEvent(true) require.NoError(t, err) require.Len(t, txn.Rows, 2) + + err = txn.TrySplitAndSortUpdateEvent(false) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 43f0bd295c6..91688e974a3 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/tablesink" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -50,6 +51,10 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh return nil } +func (m *mockSink) Scheme() string { + return sink.BlackHoleScheme +} + func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] { m.mu.Lock() defer m.mu.Unlock() diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index 673a2ff2b97..6cc1d1f35dd 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" + "github.com/pingcap/tiflow/pkg/sink" "go.uber.org/zap" ) @@ -42,6 +43,11 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang return nil } +// Scheme return the scheme of the sink. +func (s *DMLSink) Scheme() string { + return sink.BlackHoleScheme +} + // Close do nothing. func (s *DMLSink) Close() {} diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 6044fc52596..1b0fb7c3983 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -17,6 +17,7 @@ import ( "context" "math" "net/url" + "strings" "sync" "sync/atomic" @@ -65,6 +66,7 @@ type eventFragment struct { // It will send the events to cloud storage systems. type DMLSink struct { changefeedID model.ChangeFeedID + scheme string // last sequence number lastSeqNum uint64 // encodingWorkers defines a group of workers for encoding events. @@ -133,6 +135,7 @@ func NewDMLSink(ctx context.Context, wgCtx, wgCancel := context.WithCancel(ctx) s := &DMLSink{ changefeedID: changefeedID, + scheme: strings.ToLower(sinkURI.Scheme), encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), workers: make([]*dmlWorker, cfg.WorkerCount), statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink), @@ -267,3 +270,7 @@ func (s *DMLSink) Close() { func (s *DMLSink) Dead() <-chan struct{} { return s.dead } + +func (s *DMLSink) Scheme() string { + return s.scheme +} diff --git a/cdc/sink/dmlsink/event.go b/cdc/sink/dmlsink/event.go index da9466a90de..1dbfedc30dc 100644 --- a/cdc/sink/dmlsink/event.go +++ b/cdc/sink/dmlsink/event.go @@ -23,7 +23,7 @@ type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated - TrySplitAndSortUpdateEvent() error + TrySplitAndSortUpdateEvent(splitSingleEvent bool) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sink/dmlsink/event_sink.go b/cdc/sink/dmlsink/event_sink.go index 0de7e79f060..76a179ecc9c 100644 --- a/cdc/sink/dmlsink/event_sink.go +++ b/cdc/sink/dmlsink/event_sink.go @@ -18,6 +18,10 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error + + // Scheme returns the sink scheme. + Scheme() string + // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 2f01f57a598..d17bd21e097 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -185,3 +185,8 @@ func (s *dmlSink) Close() { func (s *dmlSink) Dead() <-chan struct{} { return s.dead } + +// Scheme returns the scheme of this sink. +func (s *dmlSink) Scheme() string { + return sink.KafkaScheme +} diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index a96aa9f60f7..0d89c7b42b9 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/pkg/causality" "github.com/pingcap/tiflow/pkg/config" - psink "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "golang.org/x/sync/errgroup" ) @@ -72,7 +72,7 @@ func NewMySQLSink( conflictDetectorSlots uint64, ) (*dmlSink, error) { ctx, cancel := context.WithCancel(ctx) - statistics := metrics.NewStatistics(ctx, changefeedID, psink.TxnSink) + statistics := metrics.NewStatistics(ctx, changefeedID, sink.TxnSink) backendImpls, err := mysql.NewMySQLBackends(ctx, changefeedID, sinkURI, replicaConfig, GetDBConnImpl, statistics) if err != nil { @@ -173,3 +173,7 @@ func (s *dmlSink) Close() { func (s *dmlSink) Dead() <-chan struct{} { return s.dead } + +func (s *dmlSink) Scheme() string { + return sink.MySQLScheme +} diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index 332cccba64c..f6df0959eb5 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" + "github.com/pingcap/tiflow/pkg/sink" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -48,6 +49,15 @@ type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { // For dataflow metrics. metricsTableSinkTotalRows prometheus.Counter + + // splitSingleUpdate control whether split a single update event into delete and insert events. + // For the MySQL Sink, there is no need to split a single unique key changed update event, this + // is also to keep the backward compatibility, the same behavior as before. + // For the Kafka Sink and Storage sink, we need to split a single unique key changed update event + // 1. Avro and CSV does not output the previous column values for the update event, so it would + // cause consumer missing data if the unique key changed event is not split. + // 2. Index-Value Dispatcher can work correctly if the unique key changed event is split. + splitSingleUpdate bool } // New an eventTableSink with given backendSink and event appender. @@ -70,6 +80,7 @@ func New[E dmlsink.TableEvent, P dmlsink.Appender[E]]( eventBuffer: make([]E, 0, 1024), state: state.TableSinkSinking, metricsTableSinkTotalRows: totalRowsCounter, + splitSingleUpdate: !sink.IsMySQLCompatibleScheme(backendSink.Scheme()), } } @@ -110,7 +121,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err resolvedCallbackableEvents := make([]*dmlsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.splitSingleUpdate); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 86f2e79e061..95a26711a25 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -48,6 +49,10 @@ func (m *mockEventSink) Dead() <-chan struct{} { return m.dead } +func (m *mockEventSink) Scheme() string { + return sink.BlackHoleScheme +} + // acknowledge the txn events by call the callback function. func (m *mockEventSink) acknowledge(commitTs uint64) []*dmlsink.TxnCallbackableEvent { var droppedEvents []*dmlsink.TxnCallbackableEvent From 27727139a84c387dc06412a12fbaec95aae0d3c1 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 16:05:14 +0800 Subject: [PATCH 05/18] add one more unit test. --- cdc/sink/tablesink/table_sink_impl_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 95a26711a25..72e27fa7b83 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -173,6 +173,7 @@ func TestNewEventTableSink(t *testing.T) { require.NotNil(t, tb.eventAppender, "eventAppender should be set") require.Equal(t, 0, len(tb.eventBuffer), "eventBuffer should be empty") require.Equal(t, state.TableSinkSinking, tb.state, "table sink should be sinking") + require.True(t, tb.splitSingleUpdate, "splitSingleUpdate should be true") } func TestAppendRowChangedEvents(t *testing.T) { From b720bc91c3ff1de5064041eac423f56cbd4e8502 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 16:25:20 +0800 Subject: [PATCH 06/18] fix make check --- cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 1b0fb7c3983..6d13ba5359f 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -271,6 +271,7 @@ func (s *DMLSink) Dead() <-chan struct{} { return s.dead } +// Scheme returns the sink scheme. func (s *DMLSink) Scheme() string { return s.scheme } From dcf52e078696879e649565e3ccc679b41bc48adb Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 16:34:46 +0800 Subject: [PATCH 07/18] remove some logs. --- cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 476b113af67..4476a905466 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -41,10 +41,7 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven defer r.lock.Unlock() r.hasher.Reset() r.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) - // FIXME(leoppro): if the row events includes both pre-cols and cols - // the dispatch logic here is wrong - // distribute partition by rowid or unique column value dispatchCols := row.Columns if len(row.Columns) == 0 { dispatchCols = row.PreColumns From 06a26270dba874deb41673428d2d48b4f1dc756b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 17:00:25 +0800 Subject: [PATCH 08/18] fix unit test. --- cdc/model/sink_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index ea7e59e81cd..41144fe219c 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -564,12 +564,8 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { require.Equal(t, 1, len(result)) } -func TestTrySplitAndSortUpdateEventOne(t *testing.T) { - txn := &SingleTableTxn{ - Rows: make([]*RowChangedEvent, 0, 1), - } - - txn.Rows = append(txn.Rows, &RowChangedEvent{ +var ( + uk_updated_event = &RowChangedEvent{ PreColumns: []*Column{ { Name: "col1", @@ -595,13 +591,22 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { Value: "col2-value-updated", }, }, - }) + } +) + +func TestTrySplitAndSortUpdateEventOne(t *testing.T) { + txn := &SingleTableTxn{ + Rows: []*RowChangedEvent{uk_updated_event}, + } // assume it's Kafka or storage sink. err := txn.TrySplitAndSortUpdateEvent(true) require.NoError(t, err) require.Len(t, txn.Rows, 2) + txn = &SingleTableTxn{ + Rows: []*RowChangedEvent{uk_updated_event}, + } err = txn.TrySplitAndSortUpdateEvent(false) require.NoError(t, err) require.Len(t, txn.Rows, 1) From 04c16ced15d370a02f50db107d915a61bf3644df Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 17:39:52 +0800 Subject: [PATCH 09/18] fix unit test. --- cdc/model/sink_test.go | 50 ++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 41144fe219c..d3c0fa2582c 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -564,35 +564,33 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { require.Equal(t, 1, len(result)) } -var ( - uk_updated_event = &RowChangedEvent{ - PreColumns: []*Column{ - { - Name: "col1", - Flag: BinaryFlag, - Value: "col1-value", - }, - { - Name: "col2", - Flag: HandleKeyFlag | UniqueKeyFlag, - Value: "col2-value", - }, +var uk_updated_event = &RowChangedEvent{ + PreColumns: []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value", }, + { + Name: "col2", + Flag: HandleKeyFlag | UniqueKeyFlag, + Value: "col2-value", + }, + }, - Columns: []*Column{ - { - Name: "col1", - Flag: BinaryFlag, - Value: "col1-value", - }, - { - Name: "col2", - Flag: HandleKeyFlag | UniqueKeyFlag, - Value: "col2-value-updated", - }, + Columns: []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value", }, - } -) + { + Name: "col2", + Flag: HandleKeyFlag | UniqueKeyFlag, + Value: "col2-value-updated", + }, + }, +} func TestTrySplitAndSortUpdateEventOne(t *testing.T) { txn := &SingleTableTxn{ From 163e2f297df22c17d7463360a77ead4ccc0fc269 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 11 Sep 2023 18:01:45 +0800 Subject: [PATCH 10/18] fix unit test. --- cdc/model/sink_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index d3c0fa2582c..edc292eb928 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -564,7 +564,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { require.Equal(t, 1, len(result)) } -var uk_updated_event = &RowChangedEvent{ +var ukUpdatedEvent = &RowChangedEvent{ PreColumns: []*Column{ { Name: "col1", @@ -594,7 +594,7 @@ var uk_updated_event = &RowChangedEvent{ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { txn := &SingleTableTxn{ - Rows: []*RowChangedEvent{uk_updated_event}, + Rows: []*RowChangedEvent{ukUpdatedEvent}, } // assume it's Kafka or storage sink. @@ -603,7 +603,7 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { require.Len(t, txn.Rows, 2) txn = &SingleTableTxn{ - Rows: []*RowChangedEvent{uk_updated_event}, + Rows: []*RowChangedEvent{ukUpdatedEvent}, } err = txn.TrySplitAndSortUpdateEvent(false) require.NoError(t, err) From d80294e41c611f504dbab9ce843f9278423abaf3 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 12:06:56 +0800 Subject: [PATCH 11/18] rewrite how to split the txn. --- cdc/model/sink.go | 24 +++++++++++++++++++----- cdc/sink/dmlsink/event.go | 2 +- cdc/sink/tablesink/table_sink_impl.go | 13 +------------ 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index c8714559907..3e78303806a 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/quotes" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -271,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent(_ bool) error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error { return nil } @@ -379,7 +380,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ bool) error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error { return nil } @@ -767,9 +768,8 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(splitSingleUpdateEvent bool) error { - // the txn only have one row, and no need to split the update event, just return - if len(t.Rows) < 2 && !splitSingleUpdateEvent { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { + if !t.shouldSplitUpdateEvent(scheme) { return nil } newRows, err := trySplitAndSortUpdateEvent(t.Rows) @@ -780,6 +780,20 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(splitSingleUpdateEvent bool) return nil } +func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { + // Whether split a single update event into delete and insert events: + // For the MySQL Sink, there is no need to split a single unique key changed update event, this + // is also to keep the backward compatibility, the same behavior as before. + // For the Kafka Sink and Storage sink, always split a single unique key changed update event + // 1. Avro and CSV does not output the previous column values for the update event, so it would + // cause consumer missing data if the unique key changed event is not split. + // 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. + if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { + return false + } + return true +} + // trySplitAndSortUpdateEvent try to split update events if unique key is updated // returns true if some updated events is split func trySplitAndSortUpdateEvent( diff --git a/cdc/sink/dmlsink/event.go b/cdc/sink/dmlsink/event.go index 1dbfedc30dc..8df85cab249 100644 --- a/cdc/sink/dmlsink/event.go +++ b/cdc/sink/dmlsink/event.go @@ -23,7 +23,7 @@ type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated - TrySplitAndSortUpdateEvent(splitSingleEvent bool) error + TrySplitAndSortUpdateEvent(scheme string) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index f6df0959eb5..da067501727 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" - "github.com/pingcap/tiflow/pkg/sink" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -49,15 +48,6 @@ type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { // For dataflow metrics. metricsTableSinkTotalRows prometheus.Counter - - // splitSingleUpdate control whether split a single update event into delete and insert events. - // For the MySQL Sink, there is no need to split a single unique key changed update event, this - // is also to keep the backward compatibility, the same behavior as before. - // For the Kafka Sink and Storage sink, we need to split a single unique key changed update event - // 1. Avro and CSV does not output the previous column values for the update event, so it would - // cause consumer missing data if the unique key changed event is not split. - // 2. Index-Value Dispatcher can work correctly if the unique key changed event is split. - splitSingleUpdate bool } // New an eventTableSink with given backendSink and event appender. @@ -80,7 +70,6 @@ func New[E dmlsink.TableEvent, P dmlsink.Appender[E]]( eventBuffer: make([]E, 0, 1024), state: state.TableSinkSinking, metricsTableSinkTotalRows: totalRowsCounter, - splitSingleUpdate: !sink.IsMySQLCompatibleScheme(backendSink.Scheme()), } } @@ -121,7 +110,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err resolvedCallbackableEvents := make([]*dmlsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(e.splitSingleUpdate); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. From 654084d80e5051c8fa1ecee5bff7e50bfa83c56d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 12:08:33 +0800 Subject: [PATCH 12/18] fix the ut --- cdc/model/sink_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index edc292eb928..eb899d6c109 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -20,6 +20,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" + "github.com/pingcap/tiflow/pkg/sink" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -597,15 +598,14 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { Rows: []*RowChangedEvent{ukUpdatedEvent}, } - // assume it's Kafka or storage sink. - err := txn.TrySplitAndSortUpdateEvent(true) + err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme) require.NoError(t, err) require.Len(t, txn.Rows, 2) txn = &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(false) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) require.NoError(t, err) require.Len(t, txn.Rows, 1) } From cdca12bc8e154d482972fa835d9448ed189780e2 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 14:06:26 +0800 Subject: [PATCH 13/18] fix unit test. --- cdc/sink/ddlsink/factory/factory.go | 3 +-- cdc/sink/dmlsink/factory/factory.go | 3 +-- cdc/sink/dmlsink/mq/kafka_dml_sink.go | 6 ++++-- cdc/sink/dmlsink/mq/mq_dml_sink.go | 6 +++++- cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 7 +++++-- cdc/sink/dmlsink/txn/txn_dml_sink.go | 14 +++++++++----- cdc/sink/tablesink/table_sink_impl_test.go | 1 - 7 files changed, 25 insertions(+), 15 deletions(-) diff --git a/cdc/sink/ddlsink/factory/factory.go b/cdc/sink/ddlsink/factory/factory.go index 1a8107c1bf1..17dc2e8a504 100644 --- a/cdc/sink/ddlsink/factory/factory.go +++ b/cdc/sink/ddlsink/factory/factory.go @@ -16,7 +16,6 @@ package factory import ( "context" "net/url" - "strings" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -46,7 +45,7 @@ func New( if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } - scheme := strings.ToLower(sinkURI.Scheme) + scheme := sink.GetScheme(sinkURI) switch scheme { case sink.KafkaScheme, sink.KafkaSSLScheme: factoryCreator := kafka.NewSaramaFactory diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 5b666803403..e707933bdaf 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -16,7 +16,6 @@ package factory import ( "context" "net/url" - "strings" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -63,7 +62,7 @@ func New( } s := &SinkFactory{} - schema := strings.ToLower(sinkURI.Scheme) + schema := sink.GetScheme(sinkURI) switch schema { case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh, diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index b29688d5bca..1c491dd6c1e 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/pingcap/tiflow/pkg/sink/kafka" @@ -90,7 +91,8 @@ func NewKafkaDMLSink( return nil, errors.Trace(err) } - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, sinkURI.Scheme) + scheme := sink.GetScheme(sinkURI) + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, scheme) if err != nil { return nil, errors.Trace(err) } @@ -116,7 +118,7 @@ func NewKafkaDMLSink( concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, - eventRouter, encoderGroup, protocol, errCh) + eventRouter, encoderGroup, protocol, scheme, errCh) log.Info("DML sink producer created", zap.String("namespace", changefeedID.Namespace), zap.String("changefeedID", changefeedID.ID)) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index d17bd21e097..b75cd0eefed 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -63,6 +63,8 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} + + scheme string } func newDMLSink( @@ -74,6 +76,7 @@ func newDMLSink( eventRouter *dispatcher.EventRouter, encoderGroup codec.EncoderGroup, protocol config.Protocol, + scheme string, errCh chan error, ) *dmlSink { ctx, cancel := context.WithCancel(ctx) @@ -87,6 +90,7 @@ func newDMLSink( ctx: ctx, cancel: cancel, dead: make(chan struct{}), + scheme: scheme, } s.alive.eventRouter = eventRouter s.alive.topicManager = topicManager @@ -188,5 +192,5 @@ func (s *dmlSink) Dead() <-chan struct{} { // Scheme returns the scheme of this sink. func (s *dmlSink) Scheme() string { - return sink.KafkaScheme + return s.scheme } diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index 6a11fc512a6..f976f9bff5e 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/builder" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" @@ -86,13 +87,14 @@ func NewPulsarDMLSink( return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) } + scheme := sink.GetScheme(sinkURI) // The topicManager is not actually used in pulsar , it is only used to create dmlSink. // TODO: Find a way to remove it in newDMLSink. topicManager, err := pulsarTopicManagerCreator(pConfig, client) if err != nil { return nil, errors.Trace(err) } - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, scheme) if err != nil { return nil, errors.Trace(err) } @@ -111,7 +113,8 @@ func NewPulsarDMLSink( concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) - s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderGroup, protocol, errCh) + s := newDMLSink(ctx, changefeedID, p, nil, topicManager, + eventRouter, encoderGroup, protocol, scheme, errCh) return s, nil } diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index 0d89c7b42b9..b8f78059f5d 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -54,6 +54,8 @@ type dmlSink struct { dead chan struct{} statistics *metrics.Statistics + + scheme string } // GetDBConnImpl is the implementation of pmysql.Factory. @@ -84,11 +86,13 @@ func NewMySQLSink( for _, impl := range backendImpls { backends = append(backends, impl) } - sink := newSink(ctx, changefeedID, backends, errCh, conflictDetectorSlots) - sink.statistics = statistics - sink.cancel = cancel - return sink, nil + s := newSink(ctx, changefeedID, backends, errCh, conflictDetectorSlots) + s.statistics = statistics + s.cancel = cancel + s.scheme = sink.GetScheme(sinkURI) + + return s, nil } func newSink(ctx context.Context, @@ -175,5 +179,5 @@ func (s *dmlSink) Dead() <-chan struct{} { } func (s *dmlSink) Scheme() string { - return sink.MySQLScheme + return s.scheme } diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 72e27fa7b83..95a26711a25 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -173,7 +173,6 @@ func TestNewEventTableSink(t *testing.T) { require.NotNil(t, tb.eventAppender, "eventAppender should be set") require.Equal(t, 0, len(tb.eventBuffer), "eventBuffer should be empty") require.Equal(t, state.TableSinkSinking, tb.state, "table sink should be sinking") - require.True(t, tb.splitSingleUpdate, "splitSingleUpdate should be true") } func TestAppendRowChangedEvents(t *testing.T) { From 92dcfd9efd7fdc45ea6370fa71f6dbd44e6f8a64 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 14:32:34 +0800 Subject: [PATCH 14/18] fix unit test. --- pkg/sink/sink_type.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/sink/sink_type.go b/pkg/sink/sink_type.go index 09359ad4c15..382d8437689 100644 --- a/pkg/sink/sink_type.go +++ b/pkg/sink/sink_type.go @@ -13,6 +13,11 @@ package sink +import ( + "net/url" + "strings" +) + // Type is the type of sink. type Type int @@ -91,3 +96,8 @@ func IsStorageScheme(scheme string) bool { func IsPulsarScheme(scheme string) bool { return scheme == PulsarScheme || scheme == PulsarSSLScheme } + +// GetScheme returns the scheme of the url. +func GetScheme(url *url.URL) string { + return strings.ToLower(url.Scheme) +} From 87ed2ea4282138eeb2a1f3285ef1d1ded34628d9 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 15:49:25 +0800 Subject: [PATCH 15/18] adjust encoder group concurrency. --- pkg/config/replica_config.go | 2 +- pkg/config/sink.go | 3 +++ pkg/sink/codec/encoder_group.go | 8 ++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 9febd795db5..18bfc6825a8 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -59,7 +59,7 @@ var defaultReplicaConfig = &ReplicaConfig{ NullString: NULL, BinaryEncodingMethod: BinaryEncodingBase64, }, - EncoderConcurrency: util.AddressOf(16), + EncoderConcurrency: util.AddressOf(DefaultEncoderGroupConcurrency), Terminator: util.AddressOf(CRLF), DateSeparator: util.AddressOf(DateSeparatorDay.String()), EnablePartitionSeparator: util.AddressOf(true), diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 65557230fe5..702860bd4b9 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -78,6 +78,9 @@ const ( // DefaultPulsarProducerCacheSize is the default size of the cache for producers // 10240 producers maybe cost 1.1G memory DefaultPulsarProducerCacheSize = 10240 + + // DefaultEncoderGroupConcurrency is the default concurrency of encoder group. + DefaultEncoderGroupConcurrency = 32 ) // AtomicityLevel represents the atomicity level of a changefeed. diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index 3158b4c932a..82f818a7521 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -23,15 +23,15 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) const ( - defaultEncoderGroupSize = 32 - defaultInputChanSize = 128 - defaultMetricInterval = 15 * time.Second + defaultInputChanSize = 128 + defaultMetricInterval = 15 * time.Second ) // EncoderGroup manages a group of encoders @@ -62,7 +62,7 @@ func NewEncoderGroup(builder RowEventEncoderBuilder, count int, changefeedID model.ChangeFeedID, ) *encoderGroup { if count <= 0 { - count = defaultEncoderGroupSize + count = config.DefaultEncoderGroupConcurrency } inputCh := make([]chan *future, count) From 9f376263bd92bf1998ff4d60fcf9a6934c6c7417 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 16:38:53 +0800 Subject: [PATCH 16/18] fix unit test. --- cdc/api/v2/model_test.go | 2 +- pkg/config/config_test_data.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index df443684db6..e524e3e54ba 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -50,7 +50,7 @@ var defaultAPIConfig = &ReplicaConfig{ NullString: config.NULL, BinaryEncodingMethod: config.BinaryEncodingBase64, }, - EncoderConcurrency: util.AddressOf(16), + EncoderConcurrency: util.AddressOf(config.DefaultEncoderGroupConcurrency), Terminator: util.AddressOf(config.CRLF), DateSeparator: util.AddressOf(config.DateSeparatorDay.String()), EnablePartitionSeparator: util.AddressOf(true), diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 47fd17f0bcd..0a79cc042ba 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -34,7 +34,7 @@ const ( "worker-num": 3 }, "sink": { - "encoder-concurrency": 16, + "encoder-concurrency": 32, "terminator": "\r\n", "date-separator": "day", "dispatch-rules": [ @@ -182,7 +182,7 @@ const ( "worker-num": 3 }, "sink": { - "encoder-concurrency": 16, + "encoder-concurrency": 32, "protocol": "open-protocol", "column-selectors": [ { @@ -327,7 +327,7 @@ const ( "worker-num": 3 }, "sink": { - "encoder-concurrency": 16, + "encoder-concurrency": 32, "dispatchers": null, "protocol": "open-protocol", "column-selectors": [ From 956ada291c953122d93090e9ae229bcfa0fb3fe0 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 12 Sep 2023 17:03:04 +0800 Subject: [PATCH 17/18] fix unit test. --- pkg/cmd/util/helper_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 07149f7cf03..3fdb15544f2 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -191,7 +191,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { err = cfg.ValidateAndAdjust(sinkURL) require.NoError(t, err) require.Equal(t, &config.SinkConfig{ - EncoderConcurrency: util.AddressOf(16), + EncoderConcurrency: util.AddressOf(config.DefaultEncoderGroupConcurrency), DispatchRules: []*config.DispatchRule{ {PartitionRule: "ts", TopicRule: "hello_{schema}", Matcher: []string{"test1.*", "test2.*"}}, {PartitionRule: "rowid", TopicRule: "{schema}_world", Matcher: []string{"test3.*", "test4.*"}}, @@ -230,7 +230,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { require.NoError(t, err) require.Equal(t, &config.SinkConfig{ Protocol: util.AddressOf(config.ProtocolCanalJSON.String()), - EncoderConcurrency: util.AddressOf(16), + EncoderConcurrency: util.AddressOf(config.DefaultEncoderGroupConcurrency), Terminator: util.AddressOf(config.CRLF), TxnAtomicity: util.AddressOf(config.AtomicityLevel("")), DateSeparator: util.AddressOf("day"), From 94220d8b59c73c544f85e7203156af954a453283 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 13 Sep 2023 14:32:40 +0800 Subject: [PATCH 18/18] adjust the comment. --- cdc/model/sink.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 3e78303806a..decb3d1e8cc 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -780,14 +780,16 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { return nil } +// Whether split a single update event into delete and insert events? +// +// For the MySQL Sink, there is no need to split a single unique key changed update event, this +// is also to keep the backward compatibility, the same behavior as before. +// +// For the Kafka and Storage sink, always split a single unique key changed update event, since: +// 1. Avro and CSV does not output the previous column values for the update event, so it would +// cause consumer missing data if the unique key changed event is not split. +// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - // Whether split a single update event into delete and insert events: - // For the MySQL Sink, there is no need to split a single unique key changed update event, this - // is also to keep the backward compatibility, the same behavior as before. - // For the Kafka Sink and Storage sink, always split a single unique key changed update event - // 1. Avro and CSV does not output the previous column values for the update event, so it would - // cause consumer missing data if the unique key changed event is not split. - // 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { return false }