From 7763ebac5ec119812f3cf9acc44658a27b3010f1 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 11 Sep 2023 13:58:27 +0800 Subject: [PATCH 1/5] fix csv update --- cdc/model/sink.go | 19 +++- .../sinkmanager/table_sink_wrapper.go | 78 +---------------- .../sinkmanager/table_sink_wrapper_test.go | 7 +- .../blackhole/black_hole_dml_sink.go | 6 ++ .../cloudstorage/cloud_storage_dml_sink.go | 8 ++ cdc/sinkv2/eventsink/event.go | 5 +- cdc/sinkv2/eventsink/event_sink.go | 2 + cdc/sinkv2/eventsink/mq/kafka_dml_sink.go | 2 +- cdc/sinkv2/eventsink/mq/mq_dml_sink.go | 11 ++- cdc/sinkv2/eventsink/txn/txn_sink.go | 8 ++ cdc/sinkv2/tablesink/table_sink_impl.go | 2 +- cdc/sinkv2/tablesink/table_sink_impl_test.go | 5 ++ .../storage_csv_update/conf/changefeed.toml | 16 ++++ .../storage_csv_update/conf/diff_config.toml | 29 +++++++ .../storage_csv_update/data/data.sql | 86 +++++++++++++++++++ .../storage_csv_update/data/schema.sql | 72 ++++++++++++++++ .../storage_csv_update/data/update.sql | 20 +++++ .../storage_csv_update/run.sh | 66 ++++++++++++++ 18 files changed, 355 insertions(+), 87 deletions(-) create mode 100644 tests/integration_tests/storage_csv_update/conf/changefeed.toml create mode 100644 tests/integration_tests/storage_csv_update/conf/diff_config.toml create mode 100644 tests/integration_tests/storage_csv_update/data/data.sql create mode 100644 tests/integration_tests/storage_csv_update/data/schema.sql create mode 100644 tests/integration_tests/storage_csv_update/data/update.sql create mode 100644 tests/integration_tests/storage_csv_update/run.sh diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 6cdc5d28015..765adc5dc8b 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/quotes" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -266,7 +267,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent() error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(sinkScheme string) error { return nil } @@ -364,7 +365,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(sinkScheme string) error { return nil } @@ -771,10 +772,11 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error { - if len(t.Rows) < 2 { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { + if !t.shouldSplitTxn(sinkScheme) { return nil } + newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) @@ -783,6 +785,15 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error { return nil } +func (t *SingleTableTxn) shouldSplitTxn(sinkScheme string) bool { + if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { + return false + } + // For MQ or storage sink, we need to split the transaction with single row, since some + // protocols (such as csv and avro) do not support old value. + return true +} + // trySplitAndSortUpdateEvent try to split update events if unique key is updated // returns true if some updated events is split func trySplitAndSortUpdateEvent( diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 0cefd7e6db0..845e8b67f24 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -470,87 +470,11 @@ func convertRowChangedEvents( } size += e.Row.ApproximateBytes() - - // This indicates that it is an update event, - // and after enable old value internally by default(but disable in the configuration). - // We need to handle the update event to be compatible with the old format. - if e.Row.IsUpdate() && !enableOldValue { - if shouldSplitUpdateEvent(e) { - deleteEvent, insertEvent, err := splitUpdateEvent(e) - if err != nil { - return nil, 0, errors.Trace(err) - } - // NOTICE: Please do not change the order, the delete event always comes before the insert event. - rowChangedEvents = append(rowChangedEvents, deleteEvent.Row, insertEvent.Row) - } else { - // If the handle key columns are not updated, PreColumns is directly ignored. - e.Row.PreColumns = nil - rowChangedEvents = append(rowChangedEvents, e.Row) - } - } else { - rowChangedEvents = append(rowChangedEvents, e.Row) - } + rowChangedEvents = append(rowChangedEvents, e.Row) } return rowChangedEvents, uint64(size), nil } -// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on -// whether the handle key column has been modified. -// If the handle key column is modified, -// we need to use splitUpdateEvent to split the update event into a delete and an insert event. -func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool { - // nil event will never be split. - if updateEvent == nil { - return false - } - - for i := range updateEvent.Row.Columns { - col := updateEvent.Row.Columns[i] - preCol := updateEvent.Row.PreColumns[i] - if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() { - colValueString := model.ColumnValueString(col.Value) - preColValueString := model.ColumnValueString(preCol.Value) - // If one handle key columns is updated, we need to split the event row. - if colValueString != preColValueString { - return true - } - } - } - return false -} - -// splitUpdateEvent splits an update event into a delete and an insert event. -func splitUpdateEvent( - updateEvent *model.PolymorphicEvent, -) (*model.PolymorphicEvent, *model.PolymorphicEvent, error) { - if updateEvent == nil { - return nil, nil, errors.New("nil event cannot be split") - } - - // If there is an update to handle key columns, - // we need to split the event into two events to be compatible with the old format. - // NOTICE: Here we don't need a full deep copy because - // our two events need Columns and PreColumns respectively, - // so it won't have an impact and no more full deep copy wastes memory. - deleteEvent := *updateEvent - deleteEventRow := *updateEvent.Row - deleteEventRowKV := *updateEvent.RawKV - deleteEvent.Row = &deleteEventRow - deleteEvent.RawKV = &deleteEventRowKV - - deleteEvent.Row.Columns = nil - - insertEvent := *updateEvent - insertEventRow := *updateEvent.Row - insertEventRowKV := *updateEvent.RawKV - insertEvent.Row = &insertEventRow - insertEvent.RawKV = &insertEventRowKV - // NOTICE: clean up pre cols for insert event. - insertEvent.Row.PreColumns = nil - - return &deleteEvent, &insertEvent, nil -} - func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { backoffBaseDelayInMs := int64(100) totalRetryDuration := 10 * time.Second diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 5cb65c5f609..283d1455f2f 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink" + "github.com/pingcap/tiflow/pkg/sink" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -61,6 +62,10 @@ func (m *mockSink) GetWriteTimes() int { return m.writeTimes } +func (m *mockSink) Scheme() string { + return sink.BlackHoleScheme +} + func (m *mockSink) Close() {} func (m *mockSink) Dead() <-chan struct{} { @@ -269,7 +274,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { enableOldValue := false result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.NoError(t, err) - require.Equal(t, 2, len(result)) + require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) // Update non-handle key. diff --git a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go index 53d9a9841b8..740e1ac7cef 100644 --- a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" + "github.com/pingcap/tiflow/pkg/sink" "go.uber.org/zap" ) @@ -42,6 +43,11 @@ func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChange return nil } +// Scheme returns the sink scheme. +func (s *Sink) Scheme() string { + return sink.BlackHoleScheme +} + // Close do nothing. func (s *Sink) Close() {} diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go index 937c374ad96..081098b6ae9 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go @@ -16,6 +16,7 @@ import ( "context" "math" "net/url" + "strings" "sync" "sync/atomic" @@ -65,6 +66,7 @@ type eventFragment struct { // It will send the events to cloud storage systems. type dmlSink struct { changefeedID model.ChangeFeedID + scheme string // last sequence number lastSeqNum uint64 // encodingWorkers defines a group of workers for encoding events. @@ -131,6 +133,7 @@ func NewCloudStorageSink( wgCtx, wgCancel := context.WithCancel(ctx) s := &dmlSink{ changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), + scheme: strings.ToLower(sinkURI.Scheme), encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), workers: make([]*dmlWorker, cfg.WorkerCount), statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), @@ -241,6 +244,11 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single return nil } +// Scheme returns the sink scheme. +func (s *dmlSink) Scheme() string { + return s.scheme +} + // Close closes the cloud storage sink. func (s *dmlSink) Close() { if s.cancel != nil { diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go index 0ffe68913a1..619fcb87d76 100644 --- a/cdc/sinkv2/eventsink/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -22,8 +22,9 @@ import ( type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 - // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated - TrySplitAndSortUpdateEvent() error + // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated. + // Note that sinkScheme is used to control the split behavior. + TrySplitAndSortUpdateEvent(sinkScheme string) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sinkv2/eventsink/event_sink.go b/cdc/sinkv2/eventsink/event_sink.go index 0dd1694ed48..c9aa11862b9 100644 --- a/cdc/sinkv2/eventsink/event_sink.go +++ b/cdc/sinkv2/eventsink/event_sink.go @@ -18,6 +18,8 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error + // Scheme returns the sink scheme. + Scheme() string // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. diff --git a/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go b/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go index 998ef8b3318..5b42448d05c 100644 --- a/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go @@ -117,7 +117,7 @@ func NewKafkaDMLSink( return nil, errors.Trace(err) } - s, err := newSink(ctx, p, topicManager, eventRouter, encoderConfig, + s, err := newSink(ctx, sinkURI, p, topicManager, eventRouter, encoderConfig, replicaConfig.Sink.EncoderConcurrency, errCh) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go index 25d924f955d..b9ffcbc8d87 100644 --- a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go @@ -15,6 +15,8 @@ package mq import ( "context" + "net/url" + "strings" "sync" "github.com/pingcap/errors" @@ -42,7 +44,8 @@ var _ eventsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil) // It will send the events to the MQ system. type dmlSink struct { // id indicates this sink belongs to which processor(changefeed). - id model.ChangeFeedID + id model.ChangeFeedID + scheme string // protocol indicates the protocol used by this sink. protocol config.Protocol @@ -65,6 +68,7 @@ type dmlSink struct { } func newSink(ctx context.Context, + sinkURI *url.URL, producer dmlproducer.DMLProducer, topicManager manager.TopicManager, eventRouter *dispatcher.EventRouter, @@ -86,6 +90,7 @@ func newSink(ctx context.Context, s := &dmlSink{ id: changefeedID, + scheme: strings.ToLower(sinkURI.Scheme), protocol: encoderConfig.Protocol, ctx: ctx, cancel: cancel, @@ -168,6 +173,10 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single return nil } +func (s *dmlSink) Scheme() string { + return s.scheme +} + // Close closes the sink. func (s *dmlSink) Close() { if s.cancel != nil { diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go index db9e85987de..2fe94dd4bab 100644 --- a/cdc/sinkv2/eventsink/txn/txn_sink.go +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -16,6 +16,7 @@ package txn import ( "context" "net/url" + "strings" "sync" "github.com/pingcap/errors" @@ -46,6 +47,7 @@ type sink struct { conflictDetector *causality.ConflictDetector[*worker, *txnEvent] isDead bool } + scheme string workers []*worker cancel func() @@ -84,6 +86,7 @@ func NewMySQLSink( backends = append(backends, impl) } sink := newSink(ctx, backends, errCh, conflictDetectorSlots) + sink.scheme = strings.ToLower(sinkURI.Scheme) sink.statistics = statistics sink.cancel = cancel @@ -151,6 +154,11 @@ func (s *sink) WriteEvents(txnEvents ...*eventsink.TxnCallbackableEvent) error { return nil } +// Scheme returns the sink scheme. +func (s *sink) Scheme() string { + return s.scheme +} + // Close closes the sink. It won't wait for all pending items backend handled. func (s *sink) Close() { if s.cancel != nil { diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index c815286a3cf..13f9afc3a50 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -110,7 +110,7 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. diff --git a/cdc/sinkv2/tablesink/table_sink_impl_test.go b/cdc/sinkv2/tablesink/table_sink_impl_test.go index 358198eee13..1e5ef54aa59 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl_test.go +++ b/cdc/sinkv2/tablesink/table_sink_impl_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" + "github.com/pingcap/tiflow/pkg/sink" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -39,6 +40,10 @@ func (m *mockEventSink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) err return nil } +func (m *mockEventSink) Scheme() string { + return sink.BlackHoleScheme +} + func (m *mockEventSink) Close() { close(m.dead) } diff --git a/tests/integration_tests/storage_csv_update/conf/changefeed.toml b/tests/integration_tests/storage_csv_update/conf/changefeed.toml new file mode 100644 index 00000000000..7c6715cf482 --- /dev/null +++ b/tests/integration_tests/storage_csv_update/conf/changefeed.toml @@ -0,0 +1,16 @@ +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true \ No newline at end of file diff --git a/tests/integration_tests/storage_csv_update/conf/diff_config.toml b/tests/integration_tests/storage_csv_update/conf/diff_config.toml new file mode 100644 index 00000000000..18e661f46d3 --- /dev/null +++ b/tests/integration_tests/storage_csv_update/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/storage_csv_update/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/storage_csv_update/data/data.sql b/tests/integration_tests/storage_csv_update/data/data.sql new file mode 100644 index 00000000000..ab756efe1f7 --- /dev/null +++ b/tests/integration_tests/storage_csv_update/data/data.sql @@ -0,0 +1,86 @@ +USE `test`; + + +-- make sure `nullable` can be handled by the mounter and mq encoding protocol +INSERT INTO multi_data_type() VALUES (); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time + , t_enum + , t_set, t_json) +VALUES ( -1, 1, -129, 129, -65536, 65536, -16777216, 16777216, -2147483649, 2147483649 + , true, 123.456, 123.123, 123456789012.123456789012 + , '测', '测试', x'89504E470D0A1A0A', x'89504E470D0A1A0A', '测试tinytext', '测试text', '测试mediumtext', '测试longtext' + , 'tinyblob', 'blob', 'mediumblob', 'longblob' + , '1977-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59' + , 'enum2' + , 'a,b', NULL); + +INSERT INTO multi_data_type( t_tinyint, t_tinyint_unsigned, t_smallint, t_smallint_unsigned, t_mediumint + , t_mediumint_unsigned, t_int, t_int_unsigned, t_bigint, t_bigint_unsigned + , t_boolean, t_float, t_double, t_decimal + , t_char, t_varchar, c_binary, c_varbinary, t_tinytext, t_text, t_mediumtext, t_longtext + , t_tinyblob, t_blob, t_mediumblob, t_longblob + , t_date, t_datetime, t_timestamp, t_time + , t_enum + , t_set, t_json) +VALUES ( -2, 2, -130, 130, -65537, 65537, -16777217, 16777217, -2147483650, 2147483650 + , false, 123.4567, 123.1237, 123456789012.1234567890127 + , '2', '测试2', x'89504E470D0A1A0B', x'89504E470D0A1A0B', '测试2tinytext', '测试2text', '测试2mediumtext', '测试longtext' + , 'tinyblob2', 'blob2', 'mediumblob2', 'longblob2' + , '2021-01-01', '2021-12-31 23:59:59', '19731230153000', '22:59:59' + , 'enum1' + , 'a,b,c', '{ + "id": 1, + "name": "hello" + }'); + +UPDATE multi_data_type +SET t_boolean = false +WHERE id = 1; + +DELETE +FROM multi_data_type +WHERE id = 2; + +INSERT INTO multi_charset +VALUES (1, '测试', "中国", "上海", "你好,世界" + , 0xC4E3BAC3CAC0BDE7); + +INSERT INTO multi_charset +VALUES (2, '部署', "美国", "纽约", "世界,你好" + , 0xCAC0BDE7C4E3BAC3); + +UPDATE multi_charset +SET name = '开发' +WHERE name = '测试'; + +DELETE FROM multi_charset +WHERE name = '部署' + AND country = '美国' + AND city = '纽约' + AND description = '世界,你好'; + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); + +INSERT INTO binary_columns (c_binary, c_varbinary, t_tinyblob, t_blob, t_mediumblob, t_longblob) +VALUES ( + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF', + x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF' +); diff --git a/tests/integration_tests/storage_csv_update/data/schema.sql b/tests/integration_tests/storage_csv_update/data/schema.sql new file mode 100644 index 00000000000..f74822a969c --- /dev/null +++ b/tests/integration_tests/storage_csv_update/data/schema.sql @@ -0,0 +1,72 @@ +USE `test`; + +CREATE TABLE `test_update` ( + `id` int(11) NOT NULL, + `uk` varchar(50) DEFAULT NULL, + `other` varchar(100) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk` (`uk`) +); + +CREATE TABLE multi_data_type +( + id INT AUTO_INCREMENT, + t_tinyint TINYINT, + t_tinyint_unsigned TINYINT UNSIGNED, + t_smallint SMALLINT, + t_smallint_unsigned SMALLINT UNSIGNED, + t_mediumint MEDIUMINT, + t_mediumint_unsigned MEDIUMINT UNSIGNED, + t_int INT, + t_int_unsigned INT UNSIGNED, + t_bigint BIGINT, + t_bigint_unsigned BIGINT UNSIGNED, + t_boolean BOOLEAN, + t_float FLOAT(6, 2), + t_double DOUBLE(6, 2), + t_decimal DECIMAL(38, 19), + t_char CHAR, + t_varchar VARCHAR(10), + c_binary binary(16), + c_varbinary varbinary(16), + t_tinytext TINYTEXT, + t_text TEXT, + t_mediumtext MEDIUMTEXT, + t_longtext LONGTEXT, + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, +-- t_year YEAR, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), +-- t_bit BIT(64), + t_json JSON, + PRIMARY KEY (id) +); + +CREATE TABLE multi_charset ( + id INT, + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob, + PRIMARY KEY (id) +) ENGINE = InnoDB CHARSET = utf8mb4; + +CREATE TABLE binary_columns +( + id INT AUTO_INCREMENT, + c_binary binary(255), + c_varbinary varbinary(255), + t_tinyblob TINYBLOB, + t_blob BLOB, + t_mediumblob MEDIUMBLOB, + t_longblob LONGBLOB, + PRIMARY KEY (id) +); \ No newline at end of file diff --git a/tests/integration_tests/storage_csv_update/data/update.sql b/tests/integration_tests/storage_csv_update/data/update.sql new file mode 100644 index 00000000000..e1953bf355b --- /dev/null +++ b/tests/integration_tests/storage_csv_update/data/update.sql @@ -0,0 +1,20 @@ +USE `test`; + +INSERT INTO test_update (id, uk, other) +VALUES + (1, 'uk1', 'other1'), + (2, 'uk2', 'other2'), + (3, 'uk3', 'other3'), + (4, 'uk4', 'other4'); + +-- update pk +UPDATE test_update SET id = 100 WHERE id = 1; + +-- update uk +UPDATE test_update SET uk = 'new_uk' WHERE id = 2; + +-- update other column +UPDATE test_update SET other = 'new_other' WHERE id = 3; + +-- update pk and uk +UPDATE test_update SET id = 200, uk = 'new_uk4' WHERE id = 4; \ No newline at end of file diff --git a/tests/integration_tests/storage_csv_update/run.sh b/tests/integration_tests/storage_csv_update/run.sh new file mode 100644 index 00000000000..d3167236624 --- /dev/null +++ b/tests/integration_tests/storage_csv_update/run.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # Now, we run the storage tests in mysql sink tests. + # It's a temporary solution, we will move it to a new test pipeline later. + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + # Enable tidb extension to generate the commit ts. + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s&enable-tidb-extension=true" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + + run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + test_update $* + + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +} + +function test_update() { + # test update + run_sql_file $CUR/data/update.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 30 + + table_dir="$WORK_DIR/storage_test/test/test_update" + ensure 60 ls "$table_dir" | grep -v "meta" + + version_dir=$(ls $table_dir | grep -v "meta") + date_dir=$(ls $table_dir/$version_dir) + data_name=$(ls $table_dir/$version_dir/$date_dir | grep "csv") + + file_path=$table_dir/$version_dir/$date_dir/$data_name + + echo "start check update" + check_equal "update" "1" "$(cat $file_path | grep 'U' | wc -l)" + check_equal "insert" "7" "$(cat $file_path | grep 'I' | wc -l)" + check_equal "delete" "3" "$(cat $file_path | grep 'D' | wc -l)" +} + +function check_equal() { + if [ "$2" != "$3" ]; then + echo "TEST: [$TEST_NAME] failed on $1, $2 expected, but $3 got" + exit 1 + fi +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 9e9cf224ce31cd83c5f01fb74695a328cc2fd3c4 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 12 Sep 2023 10:33:29 +0800 Subject: [PATCH 2/5] fix comments --- cdc/model/sink.go | 10 ++++++---- cdc/processor/sinkmanager/redo_log_worker.go | 5 +---- cdc/processor/sinkmanager/table_sink_worker.go | 5 +---- cdc/processor/sinkmanager/table_sink_wrapper.go | 8 ++++---- .../sinkmanager/table_sink_wrapper_test.go | 15 +++++---------- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 765adc5dc8b..1796afd588d 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -773,7 +773,7 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { // TrySplitAndSortUpdateEvent split update events if unique key is updated func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { - if !t.shouldSplitTxn(sinkScheme) { + if !t.shouldSplitUpdateEvent(sinkScheme) { return nil } @@ -785,12 +785,14 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { return nil } -func (t *SingleTableTxn) shouldSplitTxn(sinkScheme string) bool { +func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { + // For mysql sink, we do not split single-row transactions to restore + // upstream events as much as possible. if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { return false } - // For MQ or storage sink, we need to split the transaction with single row, since some - // protocols (such as csv and avro) do not support old value. + // For MQ and storage sink, we need to split the transaction with single row, since some + // protocols (such as csv and avro) do not support old value in update event. return true } diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 5b674ba568c..dd8dd600710 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -281,10 +281,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size, err = convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) - if err != nil { - return errors.Trace(err) - } + x, size = handleRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) usedMemSize += size rows = append(rows, x...) rowsSize += size diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 50609f8645c..44ce48b4e78 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -381,10 +381,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) - if err != nil { - return err - } + x, size := handleRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) events = append(events, x...) allEventSize += size usedMem += size diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 845e8b67f24..a808282125c 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -437,12 +437,12 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6 return false, uint64(0) } -// convertRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents. +// handleRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents. // It will deal with the old value compatibility. -func convertRowChangedEvents( +func handleRowChangedEvents( changefeed model.ChangeFeedID, tableID model.TableID, enableOldValue bool, events ...*model.PolymorphicEvent, -) ([]*model.RowChangedEvent, uint64, error) { +) ([]*model.RowChangedEvent, uint64) { size := 0 rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events)) for _, e := range events { @@ -472,7 +472,7 @@ func convertRowChangedEvents( size += e.Row.ApproximateBytes() rowChangedEvents = append(rowChangedEvents, e.Row) } - return rowChangedEvents, uint64(size), nil + return rowChangedEvents, uint64(size) } func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 283d1455f2f..718029e4d36 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -145,8 +145,7 @@ func TestConvertNilRowChangedEvents(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldVlaue := false - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -167,8 +166,7 @@ func TestConvertEmptyRowChangedEvents(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldValue := false - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -219,8 +217,7 @@ func TestConvertRowChangedEventsWhenEnableOldValue(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldValue := true - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) } @@ -272,8 +269,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) enableOldValue := false - result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size := handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) @@ -318,8 +314,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { }, }, } - result, size, err = convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...) - require.NoError(t, err) + result, size = handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) } From 8d26ba56d43b316a79980d5483d203cdfebe8791 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 12 Sep 2023 11:05:52 +0800 Subject: [PATCH 3/5] fix oldvalue --- cdc/processor/sinkmanager/manager.go | 13 ++++----- cdc/processor/sinkmanager/redo_log_worker.go | 29 +++++++++---------- .../sinkmanager/table_sink_worker.go | 19 +++++------- .../sinkmanager/table_sink_worker_test.go | 2 +- .../sinkmanager/table_sink_wrapper.go | 3 +- .../sinkmanager/table_sink_wrapper_test.go | 14 ++++----- 6 files changed, 33 insertions(+), 47 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 1e237afd2b1..7e07f051ba4 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -203,7 +203,6 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er }() splitTxn := m.changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn() - enableOldValue := m.changefeedInfo.Config.EnableOldValue gcErrors := make(chan error, 16) sinkErrors := make(chan error, 16) @@ -213,7 +212,7 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er if m.sinkEg == nil { var sinkCtx context.Context m.sinkEg, sinkCtx = errgroup.WithContext(m.managerCtx) - m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn, enableOldValue) + m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn) m.sinkEg.Go(func() error { return m.generateSinkTasks(sinkCtx) }) m.wg.Add(1) go func() { @@ -233,7 +232,7 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er if m.redoDMLMgr != nil && m.redoEg == nil { var redoCtx context.Context m.redoEg, redoCtx = errgroup.WithContext(m.managerCtx) - m.startRedoWorkers(redoCtx, m.redoEg, splitTxn, enableOldValue) + m.startRedoWorkers(redoCtx, m.redoEg, splitTxn) m.redoEg.Go(func() error { return m.generateRedoTasks(redoCtx) }) m.wg.Add(1) go func() { @@ -395,20 +394,20 @@ func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bo return false } -func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) { +func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) { for i := 0; i < sinkWorkerNum; i++ { w := newSinkWorker(m.changefeedID, m.sourceManager, m.sinkMemQuota, m.redoMemQuota, - m.eventCache, splitTxn, enableOldValue) + m.eventCache, splitTxn) m.sinkWorkers = append(m.sinkWorkers, w) eg.Go(func() error { return w.handleTasks(ctx, m.sinkTaskChan) }) } } -func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) { +func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) { for i := 0; i < redoWorkerNum; i++ { w := newRedoWorker(m.changefeedID, m.sourceManager, m.redoMemQuota, - m.redoDMLMgr, m.eventCache, splitTxn, enableOldValue) + m.redoDMLMgr, m.eventCache, splitTxn) m.redoWorkers = append(m.redoWorkers, w) eg.Go(func() error { return w.handleTasks(ctx, m.redoTaskChan) }) } diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index dd8dd600710..2e7b566cd69 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -29,13 +29,12 @@ import ( ) type redoWorker struct { - changefeedID model.ChangeFeedID - sourceManager *sourcemanager.SourceManager - memQuota *memquota.MemQuota - redoDMLMgr redo.DMLManager - eventCache *redoEventCache - splitTxn bool - enableOldValue bool + changefeedID model.ChangeFeedID + sourceManager *sourcemanager.SourceManager + memQuota *memquota.MemQuota + redoDMLMgr redo.DMLManager + eventCache *redoEventCache + splitTxn bool } func newRedoWorker( @@ -45,16 +44,14 @@ func newRedoWorker( redoDMLMgr redo.DMLManager, eventCache *redoEventCache, splitTxn bool, - enableOldValue bool, ) *redoWorker { return &redoWorker{ - changefeedID: changefeedID, - sourceManager: sourceManager, - memQuota: quota, - redoDMLMgr: redoDMLMgr, - eventCache: eventCache, - splitTxn: splitTxn, - enableOldValue: enableOldValue, + changefeedID: changefeedID, + sourceManager: sourceManager, + memQuota: quota, + redoDMLMgr: redoDMLMgr, + eventCache: eventCache, + splitTxn: splitTxn, } } @@ -281,7 +278,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size = handleRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) + x, size = handleRowChangedEvents(w.changefeedID, task.tableID, e) usedMemSize += size rows = append(rows, x...) rowsSize += size diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 44ce48b4e78..d90a35ce3ac 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -39,9 +39,6 @@ type sinkWorker struct { eventCache *redoEventCache // splitTxn indicates whether to split the transaction into multiple batches. splitTxn bool - // enableOldValue indicates whether to enable the old value feature. - // If it is enabled, we need to deal with the compatibility of the data format. - enableOldValue bool metricRedoEventCacheHit prometheus.Counter metricRedoEventCacheMiss prometheus.Counter @@ -55,16 +52,14 @@ func newSinkWorker( redoQuota *memquota.MemQuota, eventCache *redoEventCache, splitTxn bool, - enableOldValue bool, ) *sinkWorker { return &sinkWorker{ - changefeedID: changefeedID, - sourceManager: sourceManager, - sinkMemQuota: sinkQuota, - redoMemQuota: redoQuota, - eventCache: eventCache, - splitTxn: splitTxn, - enableOldValue: enableOldValue, + changefeedID: changefeedID, + sourceManager: sourceManager, + sinkMemQuota: sinkQuota, + redoMemQuota: redoQuota, + eventCache: eventCache, + splitTxn: splitTxn, metricRedoEventCacheHit: RedoEventCacheAccess.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "hit"), metricRedoEventCacheMiss: RedoEventCacheAccess.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "miss"), @@ -381,7 +376,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. e.Row.ReplicatingTs = task.tableSink.replicateTs - x, size := handleRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) + x, size := handleRowChangedEvents(w.changefeedID, task.tableID, e) events = append(events, x...) allEventSize += size usedMem += size diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 984855649c2..a2376b8ae9f 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -48,7 +48,7 @@ func createWorker( quota.AddTable(tableID) } - return newSinkWorker(changefeedID, sm, quota, nil, nil, splitTxn, false), sortEngine + return newSinkWorker(changefeedID, sm, quota, nil, nil, splitTxn), sortEngine } // nolint:unparam diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index a808282125c..7dc99436a4f 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -440,8 +440,7 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6 // handleRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents. // It will deal with the old value compatibility. func handleRowChangedEvents( - changefeed model.ChangeFeedID, tableID model.TableID, enableOldValue bool, - events ...*model.PolymorphicEvent, + changefeed model.ChangeFeedID, tableID model.TableID, events ...*model.PolymorphicEvent, ) ([]*model.RowChangedEvent, uint64) { size := 0 rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events)) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 718029e4d36..c1201c23e21 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -144,8 +144,7 @@ func TestConvertNilRowChangedEvents(t *testing.T) { events := []*model.PolymorphicEvent{nil} changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) - enableOldVlaue := false - result, size := handleRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...) + result, size := handleRowChangedEvents(changefeedID, tableID, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -165,8 +164,7 @@ func TestConvertEmptyRowChangedEvents(t *testing.T) { } changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) - enableOldValue := false - result, size := handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) + result, size := handleRowChangedEvents(changefeedID, tableID, events...) require.Equal(t, 0, len(result)) require.Equal(t, uint64(0), size) } @@ -216,8 +214,7 @@ func TestConvertRowChangedEventsWhenEnableOldValue(t *testing.T) { } changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) - enableOldValue := true - result, size := handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) + result, size := handleRowChangedEvents(changefeedID, tableID, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) } @@ -268,8 +265,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { } changefeedID := model.DefaultChangeFeedID("1") tableID := model.TableID(1) - enableOldValue := false - result, size := handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) + result, size := handleRowChangedEvents(changefeedID, tableID, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) @@ -314,7 +310,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) { }, }, } - result, size = handleRowChangedEvents(changefeedID, tableID, enableOldValue, events...) + result, size = handleRowChangedEvents(changefeedID, tableID, events...) require.Equal(t, 1, len(result)) require.Equal(t, uint64(216), size) } From e05446a7081583b446f2743fe381bb75a431922d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 12 Sep 2023 13:23:31 +0800 Subject: [PATCH 4/5] add comments --- cdc/model/sink.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 1796afd588d..33449a67d25 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -785,14 +785,19 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { return nil } +// Whether split a single update event into delete and insert events? +// +// For the MySQL Sink, there is no need to split a single unique key changed update event, this +// is also to keep the backward compatibility, the same behavior as before. +// +// For the Kafka and Storage sink, always split a single unique key changed update event, since: +// 1. Avro and CSV does not output the previous column values for the update event, so it would +// cause consumer missing data if the unique key changed event is not split. +// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - // For mysql sink, we do not split single-row transactions to restore - // upstream events as much as possible. if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { return false } - // For MQ and storage sink, we need to split the transaction with single row, since some - // protocols (such as csv and avro) do not support old value in update event. return true } From 70e4e85a89405889c5f61a1a5b3e3d00f7c0068f Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 12 Sep 2023 17:44:05 +0800 Subject: [PATCH 5/5] fix test --- tests/integration_tests/multi_changefeed/run.sh | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/tests/integration_tests/multi_changefeed/run.sh b/tests/integration_tests/multi_changefeed/run.sh index 72b60fff2b8..465c765361c 100755 --- a/tests/integration_tests/multi_changefeed/run.sh +++ b/tests/integration_tests/multi_changefeed/run.sh @@ -19,20 +19,11 @@ function check_old_value_enabled() { # When old value is turned on, we will have both column and pre-column in the update. # So here we have 2 (pre val) and 3 (new val). update_with_old_value_count=$(grep "BlackHoleSink: WriteEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep '\"columns\\\":\[' | grep 'value\\\":2' | grep -c 'value\\\":3') - if [[ "$update_with_old_value_count" -ne 1 ]]; then + if [[ "$update_with_old_value_count" -ne 2 ]]; then echo "can't found update row with old value" exit 1 fi - # check if exist a update row without `pre-column` - # When old value is turned off, we only have the column in the update. - # So here we only have 3 (new val). - update_without_old_value_count=$(grep "BlackHoleSink: WriteEvents" "$1/cdc.log" | grep 'pre\-columns\\\":null' | grep -c 'value\\\":3') - if [[ "$update_without_old_value_count" -ne 1 ]]; then - echo "can't found update row without old value" - exit 1 - fi - # check delete rows # check if exist a delete row with a complete `pre-column` # When old value is turned on, the pre-column in our delete will include all the columns.