diff --git a/cdc/sink/tablesink/table_sink.go b/cdc/sink/tablesink/table_sink.go index 83cf988ad63..de8d197a208 100644 --- a/cdc/sink/tablesink/table_sink.go +++ b/cdc/sink/tablesink/table_sink.go @@ -36,7 +36,7 @@ type TableSink interface { // the last synced ts means the biggest committs of the events // that have been flushed to the downstream. // This is a thread-safe method. - GetLastSyncedTs() model.Ts // 先全部用这个吧,最后再看是否合适 + GetLastSyncedTs() model.Ts // Close closes the table sink. // After it returns, no more events will be sent out from this capture. Close() diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index 1a33a21be19..3c4a4407045 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -40,6 +40,12 @@ type LastSyncedTsRecord struct { lastSyncedTs model.Ts } +func (r *LastSyncedTsRecord) getLastSyncedTs() model.Ts { + r.Lock() + defer r.Unlock() + return r.lastSyncedTs +} + // EventTableSink is a table sink that can write events. type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { @@ -128,6 +134,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err } // We have to record the event ID for the callback. postEventFlushFunc := e.progressTracker.addEvent() + evCommitTs := ev.GetCommitTs() ce := &dmlsink.CallbackableEvent[E]{ Event: ev, Callback: func() { @@ -137,8 +144,9 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err { e.lastSyncedTs.Lock() defer e.lastSyncedTs.Unlock() - if e.lastSyncedTs.lastSyncedTs < ev.GetCommitTs() { - e.lastSyncedTs.lastSyncedTs = ev.GetCommitTs() + + if e.lastSyncedTs.lastSyncedTs < evCommitTs { + e.lastSyncedTs.lastSyncedTs = evCommitTs } } postEventFlushFunc() @@ -170,9 +178,7 @@ func (e *EventTableSink[E, P]) GetCheckpointTs() model.ResolvedTs { // lastSyncedTs means the biggest committs of all the events // that have been flushed to the downstream. func (e *EventTableSink[E, P]) GetLastSyncedTs() model.Ts { - e.lastSyncedTs.Lock() - defer e.lastSyncedTs.Unlock() - return e.lastSyncedTs.lastSyncedTs + return e.lastSyncedTs.getLastSyncedTs() } // Close closes the table sink. diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 95a26711a25..647328d7747 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -38,6 +38,9 @@ type mockEventSink struct { func (m *mockEventSink) WriteEvents(rows ...*dmlsink.TxnCallbackableEvent) error { m.events = append(m.events, rows...) + // for _, event := range rows { + // event.Callback() + // } return nil } @@ -247,6 +250,7 @@ func TestGetCheckpointTs(t *testing.T) { tb.AppendRowChangedEvents(getTestRows()...) require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(0), "lastSyncedTs should be not updated") // One event will be flushed. err := tb.UpdateResolvedTs(model.NewResolvedTs(101)) @@ -254,11 +258,13 @@ func TestGetCheckpointTs(t *testing.T) { require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") sink.acknowledge(101) require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(101), "lastSyncedTs should be the same as the flushed event") // Flush all events. err = tb.UpdateResolvedTs(model.NewResolvedTs(105)) require.Nil(t, err) require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(101), "lastSyncedTs should be not updated") // Only acknowledge some events. sink.acknowledge(102) @@ -268,10 +274,12 @@ func TestGetCheckpointTs(t *testing.T) { tb.GetCheckpointTs(), "checkpointTs should still be 101", ) + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(102), "lastSyncedTs should be updated") // Ack all events. sink.acknowledge(105) require.Equal(t, model.NewResolvedTs(105), tb.GetCheckpointTs(), "checkpointTs should be 105") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(105), "lastSyncedTs should be updated") } func TestClose(t *testing.T) { @@ -404,4 +412,5 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) { currentTs := tb.GetCheckpointTs() sink.acknowledge(105) require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(105), "lastSyncedTs should not change") }