From ef317bfff194ca5739764aa55f057ae03d11dc60 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 18 Oct 2023 15:22:59 +0800 Subject: [PATCH] kafka(ticdc): fix kafka dml sink report error to sink manager (#9879) (#9880) ref pingcap/tiflow#9855 --- cdc/sinkv2/eventsink/mq/mq_dml_sink.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go index 6bf485d38d1..85f140d157e 100644 --- a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go @@ -117,10 +117,24 @@ func newSink(ctx context.Context, s.alive.Unlock() close(s.dead) - if err != nil && errors.Cause(err) != context.Canceled { + if err != nil { + if errors.Cause(err) == context.Canceled { + if s.cancelErr == nil { + return + } + err = s.cancelErr + } select { - case <-ctx.Done(): case errCh <- err: + log.Warn("mq dml sink meet error", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) + default: + log.Info("mq dml sink meet error, ignored", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) } } }()