Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): fix csv update #9723

Merged
merged 5 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -266,7 +267,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent() error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(sinkScheme string) error {
return nil
}

Expand Down Expand Up @@ -364,7 +365,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(sinkScheme string) error {
return nil
}

Expand Down Expand Up @@ -771,10 +772,11 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
if !t.shouldSplitUpdateEvent(sinkScheme) {
return nil
}

newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -783,6 +785,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
return nil
}

func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
// For mysql sink, we do not split single-row transactions to restore
// upstream events as much as possible.
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
// For MQ and storage sink, we need to split the transaction with single row, since some
// protocols (such as csv and avro) do not support old value in update event.
return true
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand Down
13 changes: 6 additions & 7 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
}()

splitTxn := m.changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn()
enableOldValue := m.changefeedInfo.Config.EnableOldValue

gcErrors := make(chan error, 16)
sinkErrors := make(chan error, 16)
Expand All @@ -213,7 +212,7 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
if m.sinkEg == nil {
var sinkCtx context.Context
m.sinkEg, sinkCtx = errgroup.WithContext(m.managerCtx)
m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn, enableOldValue)
m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn)
m.sinkEg.Go(func() error { return m.generateSinkTasks(sinkCtx) })
m.wg.Add(1)
go func() {
Expand All @@ -233,7 +232,7 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
if m.redoDMLMgr != nil && m.redoEg == nil {
var redoCtx context.Context
m.redoEg, redoCtx = errgroup.WithContext(m.managerCtx)
m.startRedoWorkers(redoCtx, m.redoEg, splitTxn, enableOldValue)
m.startRedoWorkers(redoCtx, m.redoEg, splitTxn)
m.redoEg.Go(func() error { return m.generateRedoTasks(redoCtx) })
m.wg.Add(1)
go func() {
Expand Down Expand Up @@ -395,20 +394,20 @@ func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bo
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
m.sinkMemQuota, m.redoMemQuota,
m.eventCache, splitTxn, enableOldValue)
m.eventCache, splitTxn)
m.sinkWorkers = append(m.sinkWorkers, w)
eg.Go(func() error { return w.handleTasks(ctx, m.sinkTaskChan) })
}
}

func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < redoWorkerNum; i++ {
w := newRedoWorker(m.changefeedID, m.sourceManager, m.redoMemQuota,
m.redoDMLMgr, m.eventCache, splitTxn, enableOldValue)
m.redoDMLMgr, m.eventCache, splitTxn)
m.redoWorkers = append(m.redoWorkers, w)
eg.Go(func() error { return w.handleTasks(ctx, m.redoTaskChan) })
}
Expand Down
32 changes: 13 additions & 19 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ import (
)

type redoWorker struct {
changefeedID model.ChangeFeedID
sourceManager *sourcemanager.SourceManager
memQuota *memquota.MemQuota
redoDMLMgr redo.DMLManager
eventCache *redoEventCache
splitTxn bool
enableOldValue bool
changefeedID model.ChangeFeedID
sourceManager *sourcemanager.SourceManager
memQuota *memquota.MemQuota
redoDMLMgr redo.DMLManager
eventCache *redoEventCache
splitTxn bool
}

func newRedoWorker(
Expand All @@ -45,16 +44,14 @@ func newRedoWorker(
redoDMLMgr redo.DMLManager,
eventCache *redoEventCache,
splitTxn bool,
enableOldValue bool,
) *redoWorker {
return &redoWorker{
changefeedID: changefeedID,
sourceManager: sourceManager,
memQuota: quota,
redoDMLMgr: redoDMLMgr,
eventCache: eventCache,
splitTxn: splitTxn,
enableOldValue: enableOldValue,
changefeedID: changefeedID,
sourceManager: sourceManager,
memQuota: quota,
redoDMLMgr: redoDMLMgr,
eventCache: eventCache,
splitTxn: splitTxn,
}
}

Expand Down Expand Up @@ -281,10 +278,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err = convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
if err != nil {
return errors.Trace(err)
}
x, size = handleRowChangedEvents(w.changefeedID, task.tableID, e)
usedMemSize += size
rows = append(rows, x...)
rowsSize += size
Expand Down
22 changes: 7 additions & 15 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ type sinkWorker struct {
eventCache *redoEventCache
// splitTxn indicates whether to split the transaction into multiple batches.
splitTxn bool
// enableOldValue indicates whether to enable the old value feature.
// If it is enabled, we need to deal with the compatibility of the data format.
enableOldValue bool

metricRedoEventCacheHit prometheus.Counter
metricRedoEventCacheMiss prometheus.Counter
Expand All @@ -55,16 +52,14 @@ func newSinkWorker(
redoQuota *memquota.MemQuota,
eventCache *redoEventCache,
splitTxn bool,
enableOldValue bool,
) *sinkWorker {
return &sinkWorker{
changefeedID: changefeedID,
sourceManager: sourceManager,
sinkMemQuota: sinkQuota,
redoMemQuota: redoQuota,
eventCache: eventCache,
splitTxn: splitTxn,
enableOldValue: enableOldValue,
changefeedID: changefeedID,
sourceManager: sourceManager,
sinkMemQuota: sinkQuota,
redoMemQuota: redoQuota,
eventCache: eventCache,
splitTxn: splitTxn,

metricRedoEventCacheHit: RedoEventCacheAccess.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "hit"),
metricRedoEventCacheMiss: RedoEventCacheAccess.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "miss"),
Expand Down Expand Up @@ -381,10 +376,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
if err != nil {
return err
}
x, size := handleRowChangedEvents(w.changefeedID, task.tableID, e)
events = append(events, x...)
allEventSize += size
usedMem += size
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func createWorker(
quota.AddTable(tableID)
}

return newSinkWorker(changefeedID, sm, quota, nil, nil, splitTxn, false), sortEngine
return newSinkWorker(changefeedID, sm, quota, nil, nil, splitTxn), sortEngine
}

// nolint:unparam
Expand Down
89 changes: 6 additions & 83 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,11 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6
return false, uint64(0)
}

// convertRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents.
// handleRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents.
// It will deal with the old value compatibility.
func convertRowChangedEvents(
changefeed model.ChangeFeedID, tableID model.TableID, enableOldValue bool,
events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64, error) {
func handleRowChangedEvents(
changefeed model.ChangeFeedID, tableID model.TableID, events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64) {
size := 0
rowChangedEvents := make([]*model.RowChangedEvent, 0, len(events))
for _, e := range events {
Expand Down Expand Up @@ -470,85 +469,9 @@ func convertRowChangedEvents(
}

size += e.Row.ApproximateBytes()

// This indicates that it is an update event,
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if e.Row.IsUpdate() && !enableOldValue {
if shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
return nil, 0, errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
rowChangedEvents = append(rowChangedEvents, deleteEvent.Row, insertEvent.Row)
} else {
// If the handle key columns are not updated, PreColumns is directly ignored.
e.Row.PreColumns = nil
rowChangedEvents = append(rowChangedEvents, e.Row)
}
} else {
rowChangedEvents = append(rowChangedEvents, e.Row)
}
rowChangedEvents = append(rowChangedEvents, e.Row)
}
return rowChangedEvents, uint64(size), nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column has been modified.
// If the handle key column is modified,
// we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

for i := range updateEvent.Row.Columns {
col := updateEvent.Row.Columns[i]
preCol := updateEvent.Row.PreColumns[i]
if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() {
colValueString := model.ColumnValueString(col.Value)
preColValueString := model.ColumnValueString(preCol.Value)
// If one handle key columns is updated, we need to split the event row.
if colValueString != preColValueString {
return true
}
}
}
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
updateEvent *model.PolymorphicEvent,
) (*model.PolymorphicEvent, *model.PolymorphicEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because
// our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEventRow := *updateEvent.Row
deleteEventRowKV := *updateEvent.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
insertEventRowKV := *updateEvent.RawKV
insertEvent.Row = &insertEventRow
insertEvent.RawKV = &insertEventRowKV
// NOTICE: clean up pre cols for insert event.
insertEvent.Row.PreColumns = nil

return &deleteEvent, &insertEvent, nil
return rowChangedEvents, uint64(size)
}

func genReplicateTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
Expand Down
26 changes: 11 additions & 15 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -61,6 +62,10 @@ func (m *mockSink) GetWriteTimes() int {
return m.writeTimes
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
}

func (m *mockSink) Close() {}

func (m *mockSink) Dead() <-chan struct{} {
Expand Down Expand Up @@ -139,9 +144,7 @@ func TestConvertNilRowChangedEvents(t *testing.T) {
events := []*model.PolymorphicEvent{nil}
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldVlaue := false
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldVlaue, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, tableID, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}
Expand All @@ -161,9 +164,7 @@ func TestConvertEmptyRowChangedEvents(t *testing.T) {
}
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldValue := false
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, tableID, events...)
require.Equal(t, 0, len(result))
require.Equal(t, uint64(0), size)
}
Expand Down Expand Up @@ -213,9 +214,7 @@ func TestConvertRowChangedEventsWhenEnableOldValue(t *testing.T) {
}
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldValue := true
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size := handleRowChangedEvents(changefeedID, tableID, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(216), size)
}
Expand Down Expand Up @@ -266,10 +265,8 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) {
}
changefeedID := model.DefaultChangeFeedID("1")
tableID := model.TableID(1)
enableOldValue := false
result, size, err := convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
require.Equal(t, 2, len(result))
result, size := handleRowChangedEvents(changefeedID, tableID, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(216), size)

// Update non-handle key.
Expand Down Expand Up @@ -313,8 +310,7 @@ func TestConvertRowChangedEventsWhenDisableOldValue(t *testing.T) {
},
},
}
result, size, err = convertRowChangedEvents(changefeedID, tableID, enableOldValue, events...)
require.NoError(t, err)
result, size = handleRowChangedEvents(changefeedID, tableID, events...)
require.Equal(t, 1, len(result))
require.Equal(t, uint64(216), size)
}
Expand Down
Loading
Loading