diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index aac34148987..73047ebbbfa 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -186,7 +186,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // events have been reported. Then we can continue the table // at the checkpoint position. case tablesink.SinkInternalError: - task.tableSink.clearTableSink() + task.tableSink.closeAndClearTableSink() // After the table sink is cleared all pending events are sent out or dropped. // So we can re-add the table into sinkMemQuota. w.sinkMemQuota.ClearTable(task.tableSink.span) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 09c82123e75..2d71cfd66b8 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -159,13 +159,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) error { t.tableSinkMu.RLock() defer t.tableSinkMu.RUnlock() - // If it's nil it means it's closed. - if t.tableSink != nil { - t.tableSink.AppendRowChangedEvents(events...) - } else { + if t.tableSink == nil { // If it's nil it means it's closed. return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } + t.tableSink.AppendRowChangedEvents(events...) return nil } @@ -187,15 +185,11 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { t.tableSinkMu.RLock() defer t.tableSinkMu.RUnlock() - if t.tableSink != nil { - if err := t.tableSink.UpdateResolvedTs(ts); err != nil { - return errors.Trace(err) - } - } else { + if t.tableSink == nil { // If it's nil it means it's closed. return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } - return nil + return t.tableSink.UpdateResolvedTs(ts) } func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs { @@ -266,7 +260,7 @@ func (t *tableSinkWrapper) markAsClosed() { func (t *tableSinkWrapper) asyncClose() bool { t.markAsClosing() - if t.asyncClearTableSink() { + if t.asyncCloseAndClearTableSink() { t.markAsClosed() return true } @@ -275,7 +269,7 @@ func (t *tableSinkWrapper) asyncClose() bool { func (t *tableSinkWrapper) close() { t.markAsClosing() - t.clearTableSink() + t.closeAndClearTableSink() t.markAsClosed() } @@ -290,33 +284,43 @@ func (t *tableSinkWrapper) initTableSink() bool { return true } -func (t *tableSinkWrapper) asyncClearTableSink() bool { - t.tableSinkMu.Lock() - defer t.tableSinkMu.Unlock() - if t.tableSink != nil { - if !t.tableSink.AsyncClose() { - return false - } - checkpointTs := t.tableSink.GetCheckpointTs() - if t.tableSinkCheckpointTs.Less(checkpointTs) { - t.tableSinkCheckpointTs = checkpointTs - } - t.tableSink = nil +func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { + t.tableSinkMu.RLock() + if t.tableSink == nil { + t.tableSinkMu.RUnlock() + return true } + if !t.tableSink.AsyncClose() { + t.tableSinkMu.RUnlock() + return false + } + t.tableSinkMu.RUnlock() + t.doTableSinkClear() return true } -func (t *tableSinkWrapper) clearTableSink() { +func (t *tableSinkWrapper) closeAndClearTableSink() { + t.tableSinkMu.RLock() + if t.tableSink == nil { + t.tableSinkMu.RUnlock() + return + } + t.tableSink.Close() + t.tableSinkMu.RUnlock() + t.doTableSinkClear() +} + +func (t *tableSinkWrapper) doTableSinkClear() { t.tableSinkMu.Lock() defer t.tableSinkMu.Unlock() - if t.tableSink != nil { - t.tableSink.Close() - checkpointTs := t.tableSink.GetCheckpointTs() - if t.tableSinkCheckpointTs.Less(checkpointTs) { - t.tableSinkCheckpointTs = checkpointTs - } - t.tableSink = nil + if t.tableSink == nil { + return + } + checkpointTs := t.tableSink.GetCheckpointTs() + if t.tableSinkCheckpointTs.Less(checkpointTs) { + t.tableSinkCheckpointTs = checkpointTs } + t.tableSink = nil } // When the attached sink fail, there can be some events that have already been diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 4aff1adcbea..5dd68d093e5 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -298,6 +298,21 @@ func TestClose(t *testing.T) { }, time.Second, time.Millisecond*10, "table should be stopped") } +func TestOperationsAfterClose(t *testing.T) { + t.Parallel() + + sink := &mockEventSink{dead: make(chan struct{})} + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), model.Ts(0), + sink, &dmlsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + + require.True(t, tb.AsyncClose()) + + tb.AppendRowChangedEvents(getTestRows()...) + err := tb.UpdateResolvedTs(model.NewResolvedTs(105)) + require.Nil(t, err) +} + func TestCloseCancellable(t *testing.T) { t.Parallel()