Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Nov 23, 2023
1 parent 6b92aa4 commit f45c7ae
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/tablesink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type TableSink interface {
// the last synced ts means the biggest committs of the events
// that have been flushed to the downstream.
// This is a thread-safe method.
GetLastSyncedTs() model.Ts // 先全部用这个吧,最后再看是否合适
GetLastSyncedTs() model.Ts
// Close closes the table sink.
// After it returns, no more events will be sent out from this capture.
Close()
Expand Down
16 changes: 11 additions & 5 deletions cdc/sink/tablesink/table_sink_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type LastSyncedTsRecord struct {
lastSyncedTs model.Ts
}

func (r *LastSyncedTsRecord) getLastSyncedTs() model.Ts {
r.Lock()
defer r.Unlock()
return r.lastSyncedTs
}

// EventTableSink is a table sink that can write events.

type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct {
Expand Down Expand Up @@ -128,6 +134,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err
}
// We have to record the event ID for the callback.
postEventFlushFunc := e.progressTracker.addEvent()
evCommitTs := ev.GetCommitTs()
ce := &dmlsink.CallbackableEvent[E]{
Event: ev,
Callback: func() {
Expand All @@ -137,8 +144,9 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err
{
e.lastSyncedTs.Lock()
defer e.lastSyncedTs.Unlock()
if e.lastSyncedTs.lastSyncedTs < ev.GetCommitTs() {
e.lastSyncedTs.lastSyncedTs = ev.GetCommitTs()

if e.lastSyncedTs.lastSyncedTs < evCommitTs {
e.lastSyncedTs.lastSyncedTs = evCommitTs
}
}
postEventFlushFunc()
Expand Down Expand Up @@ -170,9 +178,7 @@ func (e *EventTableSink[E, P]) GetCheckpointTs() model.ResolvedTs {
// lastSyncedTs means the biggest committs of all the events
// that have been flushed to the downstream.
func (e *EventTableSink[E, P]) GetLastSyncedTs() model.Ts {
e.lastSyncedTs.Lock()
defer e.lastSyncedTs.Unlock()
return e.lastSyncedTs.lastSyncedTs
return e.lastSyncedTs.getLastSyncedTs()
}

// Close closes the table sink.
Expand Down
9 changes: 9 additions & 0 deletions cdc/sink/tablesink/table_sink_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type mockEventSink struct {

func (m *mockEventSink) WriteEvents(rows ...*dmlsink.TxnCallbackableEvent) error {
m.events = append(m.events, rows...)
// for _, event := range rows {
// event.Callback()
// }
return nil
}

Expand Down Expand Up @@ -247,18 +250,21 @@ func TestGetCheckpointTs(t *testing.T) {

tb.AppendRowChangedEvents(getTestRows()...)
require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0")
require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(0), "lastSyncedTs should be not updated")

// One event will be flushed.
err := tb.UpdateResolvedTs(model.NewResolvedTs(101))
require.Nil(t, err)
require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0")
sink.acknowledge(101)
require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101")
require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(101), "lastSyncedTs should be the same as the flushed event")

// Flush all events.
err = tb.UpdateResolvedTs(model.NewResolvedTs(105))
require.Nil(t, err)
require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101")
require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(101), "lastSyncedTs should be not updated")

// Only acknowledge some events.
sink.acknowledge(102)
Expand All @@ -268,10 +274,12 @@ func TestGetCheckpointTs(t *testing.T) {
tb.GetCheckpointTs(),
"checkpointTs should still be 101",
)
require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(102), "lastSyncedTs should be updated")

// Ack all events.
sink.acknowledge(105)
require.Equal(t, model.NewResolvedTs(105), tb.GetCheckpointTs(), "checkpointTs should be 105")
require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(105), "lastSyncedTs should be updated")
}

func TestClose(t *testing.T) {
Expand Down Expand Up @@ -404,4 +412,5 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) {
currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(105), "lastSyncedTs should not change")
}

0 comments on commit f45c7ae

Please sign in to comment.