Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#11011
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed May 14, 2024
1 parent 687dea0 commit 3dbf2fc
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
35 changes: 28 additions & 7 deletions cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type encodingWorkerGroup struct {
workerNum int
nextWorker atomic.Uint64

closed chan struct{}
closed chan error
}

func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
Expand All @@ -121,19 +121,20 @@ func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
closed: make(chan error, 1),
}
}

func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
defer func() {
close(e.closed)
log.Warn("redo encoding workers closed",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo fileWorkerGroup closed with error",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
e.closed <- err
}
close(e.closed)
}()
eg, egCtx := errgroup.WithContext(ctx)
for i := 0; i < e.workerNum; i++ {
Expand Down Expand Up @@ -183,8 +184,13 @@ func (e *encodingWorkerGroup) input(
select {
case <-ctx.Done():
return ctx.Err()
<<<<<<< HEAD
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
=======
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
>>>>>>> 52b4301388 (redo(ticdc): return internal error in redo writer (#11011))
case e.inputChs[idx] <- event:
return nil
}
Expand All @@ -196,8 +202,13 @@ func (e *encodingWorkerGroup) output(
select {
case <-ctx.Done():
return ctx.Err()
<<<<<<< HEAD
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
=======
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
>>>>>>> 52b4301388 (redo(ticdc): return internal error in redo writer (#11011))
case e.outputCh <- event:
return nil
}
Expand All @@ -221,8 +232,13 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
<<<<<<< HEAD
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
=======
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
>>>>>>> 52b4301388 (redo(ticdc): return internal error in redo writer (#11011))
case <-flushCh:
}
return nil
Expand All @@ -245,8 +261,13 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro
select {
case <-ctx.Done():
return ctx.Err()
<<<<<<< HEAD
case <-e.closed:
return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed")
=======
case err := <-e.closed:
return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed")
>>>>>>> 52b4301388 (redo(ticdc): return internal error in redo writer (#11011))
case <-ch:
}
}
Expand Down
10 changes: 4 additions & 6 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ func (f *fileWorkerGroup) Run(
) (err error) {
defer func() {
f.close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo file workers closed with error",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Error(err))
}
log.Warn("redo file workers closed",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Error(err))
}()

eg, egCtx := errgroup.WithContext(ctx)
Expand Down
8 changes: 8 additions & 0 deletions cdc/redo/writer/memory/mem_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,19 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) {
require.NoError(t, err)

require.ErrorIs(t, lw.Close(), context.Canceled)
<<<<<<< HEAD
require.Eventually(t, func() bool {
err = lw.WriteEvents(ctx, events...)
return err != nil
}, 2*time.Second, 10*time.Millisecond)
require.ErrorContains(t, err, "redo log writer stopped")
err = lw.FlushLog(ctx)
require.ErrorContains(t, err, "redo log writer stopped")
=======

err = lw.WriteEvents(ctx, events...)
require.NoError(t, err)
err = lw.FlushLog(ctx)
require.NoError(t, err)
>>>>>>> 52b4301388 (redo(ticdc): return internal error in redo writer (#11011))
}

0 comments on commit 3dbf2fc

Please sign in to comment.