Skip to content

Commit

Permalink
redo(ticdc): return internal error in redo writer (#11011)
Browse files Browse the repository at this point in the history
close #10124
  • Loading branch information
CharlesCheung96 committed May 21, 2024
1 parent 288bb64 commit babff66
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 27 deletions.
31 changes: 16 additions & 15 deletions cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type encodingWorkerGroup struct {
workerNum int
nextWorker atomic.Uint64

closed chan struct{}
closed chan error
}

func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
Expand All @@ -121,19 +121,20 @@ func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
closed: make(chan error, 1),
}
}

func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
defer func() {
close(e.closed)
log.Warn("redo encoding workers closed",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo fileWorkerGroup closed with error",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
e.closed <- err
}
close(e.closed)
}()
eg, egCtx := errgroup.WithContext(ctx)
for i := 0; i < e.workerNum; i++ {
Expand Down Expand Up @@ -183,8 +184,8 @@ func (e *encodingWorkerGroup) input(
select {
case <-ctx.Done():
return ctx.Err()
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
case e.inputChs[idx] <- event:
return nil
}
Expand All @@ -196,8 +197,8 @@ func (e *encodingWorkerGroup) output(
select {
case <-ctx.Done():
return ctx.Err()
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
case e.outputCh <- event:
return nil
}
Expand All @@ -221,8 +222,8 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
case <-flushCh:
}
return nil
Expand All @@ -245,8 +246,8 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro
select {
case <-ctx.Done():
return ctx.Err()
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
case <-ch:
}
}
Expand Down
10 changes: 4 additions & 6 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ func (f *fileWorkerGroup) Run(
) (err error) {
defer func() {
f.close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo file workers closed with error",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Error(err))
}
log.Warn("redo file workers closed",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Error(err))
}()

eg, egCtx := errgroup.WithContext(ctx)
Expand Down
10 changes: 4 additions & 6 deletions cdc/redo/writer/memory/mem_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) {
require.NoError(t, err)

require.ErrorIs(t, lw.Close(), context.Canceled)
require.Eventually(t, func() bool {
err = lw.WriteEvents(ctx, events...)
return err != nil
}, 2*time.Second, 10*time.Millisecond)
require.ErrorContains(t, err, "redo log writer stopped")

err = lw.WriteEvents(ctx, events...)
require.NoError(t, err)
err = lw.FlushLog(ctx)
require.ErrorContains(t, err, "redo log writer stopped")
require.NoError(t, err)
}

0 comments on commit babff66

Please sign in to comment.