Skip to content

Commit

Permalink
sink(cdc): reduce lock when closing table sinks (#9310)
Browse files Browse the repository at this point in the history
ref #9309
  • Loading branch information
hicqu authored Jun 30, 2023
1 parent 7497ea6 commit f74252b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.clearTableSink()
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.span)
Expand Down
68 changes: 36 additions & 32 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err
func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) error {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
// If it's nil it means it's closed.
if t.tableSink != nil {
t.tableSink.AppendRowChangedEvents(events...)
} else {
if t.tableSink == nil {
// If it's nil it means it's closed.
return tablesink.NewSinkInternalError(errors.New("table sink cleared"))
}
t.tableSink.AppendRowChangedEvents(events...)
return nil
}

Expand All @@ -187,15 +185,11 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink != nil {
if err := t.tableSink.UpdateResolvedTs(ts); err != nil {
return errors.Trace(err)
}
} else {
if t.tableSink == nil {
// If it's nil it means it's closed.
return tablesink.NewSinkInternalError(errors.New("table sink cleared"))
}
return nil
return t.tableSink.UpdateResolvedTs(ts)
}

func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs {
Expand Down Expand Up @@ -266,7 +260,7 @@ func (t *tableSinkWrapper) markAsClosed() {

func (t *tableSinkWrapper) asyncClose() bool {
t.markAsClosing()
if t.asyncClearTableSink() {
if t.asyncCloseAndClearTableSink() {
t.markAsClosed()
return true
}
Expand All @@ -275,7 +269,7 @@ func (t *tableSinkWrapper) asyncClose() bool {

func (t *tableSinkWrapper) close() {
t.markAsClosing()
t.clearTableSink()
t.closeAndClearTableSink()
t.markAsClosed()
}

Expand All @@ -290,33 +284,43 @@ func (t *tableSinkWrapper) initTableSink() bool {
return true
}

func (t *tableSinkWrapper) asyncClearTableSink() bool {
t.tableSinkMu.Lock()
defer t.tableSinkMu.Unlock()
if t.tableSink != nil {
if !t.tableSink.AsyncClose() {
return false
}
checkpointTs := t.tableSink.GetCheckpointTs()
if t.tableSinkCheckpointTs.Less(checkpointTs) {
t.tableSinkCheckpointTs = checkpointTs
}
t.tableSink = nil
func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
t.tableSinkMu.RLock()
if t.tableSink == nil {
t.tableSinkMu.RUnlock()
return true
}
if !t.tableSink.AsyncClose() {
t.tableSinkMu.RUnlock()
return false
}
t.tableSinkMu.RUnlock()
t.doTableSinkClear()
return true
}

func (t *tableSinkWrapper) clearTableSink() {
func (t *tableSinkWrapper) closeAndClearTableSink() {
t.tableSinkMu.RLock()
if t.tableSink == nil {
t.tableSinkMu.RUnlock()
return
}
t.tableSink.Close()
t.tableSinkMu.RUnlock()
t.doTableSinkClear()
}

func (t *tableSinkWrapper) doTableSinkClear() {
t.tableSinkMu.Lock()
defer t.tableSinkMu.Unlock()
if t.tableSink != nil {
t.tableSink.Close()
checkpointTs := t.tableSink.GetCheckpointTs()
if t.tableSinkCheckpointTs.Less(checkpointTs) {
t.tableSinkCheckpointTs = checkpointTs
}
t.tableSink = nil
if t.tableSink == nil {
return
}
checkpointTs := t.tableSink.GetCheckpointTs()
if t.tableSinkCheckpointTs.Less(checkpointTs) {
t.tableSinkCheckpointTs = checkpointTs
}
t.tableSink = nil
}

// When the attached sink fail, there can be some events that have already been
Expand Down
15 changes: 15 additions & 0 deletions cdc/sink/tablesink/table_sink_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ func TestClose(t *testing.T) {
}, time.Second, time.Millisecond*10, "table should be stopped")
}

func TestOperationsAfterClose(t *testing.T) {
t.Parallel()

sink := &mockEventSink{dead: make(chan struct{})}
tb := New[*model.SingleTableTxn](
model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), model.Ts(0),
sink, &dmlsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{}))

require.True(t, tb.AsyncClose())

tb.AppendRowChangedEvents(getTestRows()...)
err := tb.UpdateResolvedTs(model.NewResolvedTs(105))
require.Nil(t, err)
}

func TestCloseCancellable(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit f74252b

Please sign in to comment.