diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 9380b472d69..558d83076d4 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -291,12 +291,6 @@ func (t *tableSinkWrapper) asyncStop() bool { return false } -func (t *tableSinkWrapper) stop() { - t.markAsClosing() - t.closeAndClearTableSink() - t.markAsClosed() -} - // Return true means the internal table sink has been initialized. func (t *tableSinkWrapper) initTableSink() bool { t.tableSinkMu.Lock() @@ -327,9 +321,11 @@ func (t *tableSinkWrapper) closeTableSink() { } func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { - t.asyncCloseTableSink() - t.doTableSinkClear() - return true + closed := t.asyncCloseTableSink() + if closed { + t.doTableSinkClear() + } + return closed } func (t *tableSinkWrapper) closeAndClearTableSink() { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 29d8cafbf7f..18e8302b103 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -65,6 +65,22 @@ func (m *mockSink) Dead() <-chan struct{} { return make(chan struct{}) } +type mockDelayedTableSink struct { + tablesink.TableSink + + closeCnt int + closeTarget int +} + +func (t *mockDelayedTableSink) AsyncClose() bool { + t.closeCnt++ + if t.closeCnt >= t.closeTarget { + t.TableSink.Close() + return true + } + return false +} + //nolint:unparam func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) { tableState := tablepb.TableStatePreparing @@ -84,13 +100,26 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table return wrapper, sink } -func TestTableSinkWrapperClose(t *testing.T) { +func TestTableSinkWrapperStop(t *testing.T) { t.Parallel() wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1) + wrapper.tableSink = &mockDelayedTableSink{ + TableSink: wrapper.tableSink, + closeCnt: 0, + closeTarget: 10, + } require.Equal(t, tablepb.TableStatePreparing, wrapper.getState()) - wrapper.stop() + + closeCnt := 0 + for { + closeCnt++ + if wrapper.asyncStop() { + break + } + } require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped") + require.Equal(t, 10, closeCnt, "table sink should be closed 10 times") } func TestUpdateReceivedSorterResolvedTs(t *testing.T) {