From 89cdc3c0893d0cfd66754d502efcef5d82b5c99e Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 15 Nov 2023 21:41:43 +0800 Subject: [PATCH] add changefeed log --- cdc/processor/processor.go | 12 ++++++++++-- cdc/redo/manager.go | 23 +++++++++++++++++------ cdc/redo/meta_manager.go | 16 ++++++++++++++-- cdc/redo/writer/memory/encoding_worker.go | 23 ++++++++++++++++------- cdc/redo/writer/memory/file_worker.go | 10 ++++++++-- cdc/redo/writer/memory/mem_log_writer.go | 4 +++- cdc/sink/dmlsink/txn/mysql/mysql.go | 22 +++++++++++++--------- 7 files changed, 81 insertions(+), 29 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 58205b11008..ad06b65fc13 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -748,6 +748,8 @@ func (p *processor) updateBarrierTs(barrier *schedulepb.Barrier) { globalBarrierTs = schemaResolvedTs } log.Debug("update barrierTs", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), zap.Any("tableBarriers", barrier.GetTableBarriers()), zap.Uint64("globalBarrierTs", globalBarrierTs)) @@ -770,7 +772,10 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str retry.WithIsRetryableErr(cerror.IsRetryableError)) if tableName == nil { - log.Warn("failed to get table name for metric", zap.Any("tableID", tableID)) + log.Warn("failed to get table name for metric", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.Any("tableID", tableID)) return strconv.Itoa(int(tableID)) } @@ -858,7 +863,10 @@ func (p *processor) Close() error { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) if err := p.agent.Close(); err != nil { - log.Warn("close agent meet error", zap.Error(err)) + log.Warn("close agent meet error", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.Error(err)) } log.Info("Processor closed agent successfully", zap.String("namespace", p.changefeedID.Namespace), diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index be01d40fdf4..442b3c79634 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -374,7 +374,10 @@ func (m *logManager) GetResolvedTs(span tablepb.Span) model.Ts { func (m *logManager) AddTable(span tablepb.Span, startTs uint64) { _, loaded := m.rtsMap.LoadOrStore(span, &statefulRts{flushed: startTs, unflushed: startTs}) if loaded { - log.Warn("add duplicated table in redo log manager", zap.Stringer("span", &span)) + log.Warn("add duplicated table in redo log manager", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Stringer("span", &span)) return } } @@ -382,7 +385,10 @@ func (m *logManager) AddTable(span tablepb.Span, startTs uint64) { // RemoveTable removes a table from redo log manager func (m *logManager) RemoveTable(span tablepb.Span) { if _, ok := m.rtsMap.LoadAndDelete(span); !ok { - log.Warn("remove a table not maintained in redo log manager", zap.Stringer("span", &span)) + log.Warn("remove a table not maintained in redo log manager", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Stringer("span", &span)) return } } @@ -408,6 +414,8 @@ func (m *logManager) postFlush(tableRtsMap *spanz.HashMap[model.Ts]) { changed := value.(*statefulRts).checkAndSetFlushed(flushed) if !changed { log.Debug("flush redo with regressed resolved ts", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), zap.Stringer("span", &span), zap.Uint64("flushed", flushed), zap.Uint64("current", value.(*statefulRts).getFlushed())) @@ -425,12 +433,15 @@ func (m *logManager) flushLog( *workTimeSlice += time.Since(start) }() if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { - log.Debug("Fail to update flush flag, " + - "the previous flush operation hasn't finished yet") + log.Debug("Fail to update flush flag, "+ + "the previous flush operation hasn't finished yet", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID)) if time.Since(m.lastFlushTime) > redo.FlushWarnDuration { log.Warn("flushLog blocking too long, the redo manager may be stuck", - zap.Duration("duration", time.Since(m.lastFlushTime)), - zap.Any("changfeed", m.cfg.ChangeFeedID)) + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Duration("duration", time.Since(m.lastFlushTime))) } return } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 3db0d083d86..3569c833681 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -106,6 +106,8 @@ func NewMetaManager( if m.flushIntervalInMs < redo.MinFlushIntervalInMs { log.Warn("redo flush interval is too small, use default value", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), zap.Int64("interval", m.flushIntervalInMs)) m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs } @@ -264,6 +266,8 @@ func (m *metaManager) initMeta(ctx context.Context) error { common.ParseMeta(metas, &checkpointTs, &resolvedTs) if checkpointTs == 0 || resolvedTs == 0 { log.Panic("checkpointTs or resolvedTs is 0 when initializing redo meta in owner", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", resolvedTs)) } @@ -325,11 +329,17 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool { commitTs, fileType, err := redo.ParseLogFileName(path) if err != nil { - log.Error("parse file name failed", zap.String("path", path), zap.Error(err)) + log.Error("parse file name failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Error(err)) return false } if fileType != redo.RedoDDLLogFileType && fileType != redo.RedoRowLogFileType { - log.Panic("unknown file type", zap.String("path", path), zap.Any("fileType", fileType)) + log.Panic("unknown file type", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Any("fileType", fileType)) } // if commitTs == checkPointTs, the DDL may be executed in the owner, @@ -451,6 +461,8 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { m.preMetaFile = metaFile log.Debug("flush meta to s3", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), zap.String("metaFile", metaFile), zap.Any("cost", time.Since(start).Milliseconds())) m.metricFlushLogDuration.Observe(time.Since(start).Seconds()) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index bd878f65b2b..07e9824d124 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -98,6 +98,7 @@ func (e *polymorphicRedoEvent) encode() (err error) { } type encodingWorkerGroup struct { + changefeed model.ChangeFeedID outputCh chan *polymorphicRedoEvent inputChs []chan *polymorphicRedoEvent workerNum int @@ -106,7 +107,8 @@ type encodingWorkerGroup struct { closed chan struct{} } -func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { +func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup { + workerNum := cfg.EncodingWorkerNum if workerNum <= 0 { workerNum = redo.DefaultEncodingWorkerNum } @@ -115,10 +117,11 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { inputChs[i] = make(chan *polymorphicRedoEvent, redo.DefaultEncodingInputChanSize) } return &encodingWorkerGroup{ - inputChs: inputChs, - outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize), - workerNum: workerNum, - closed: make(chan struct{}), + changefeed: cfg.ChangeFeedID, + inputChs: inputChs, + outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize), + workerNum: workerNum, + closed: make(chan struct{}), } } @@ -126,7 +129,10 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { close(e.closed) if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo fileWorkerGroup closed with error", zap.Error(err)) + log.Warn("redo fileWorkerGroup closed with error", + zap.String("namespace", e.changefeed.Namespace), + zap.String("changefeed", e.changefeed.ID), + zap.Error(err)) } }() eg, egCtx := errgroup.WithContext(ctx) @@ -136,7 +142,10 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { return e.runWorker(egCtx, idx) }) } - log.Info("redo log encoding workers started", zap.Int("workerNum", e.workerNum)) + log.Info("redo log encoding workers started", + zap.String("namespace", e.changefeed.Namespace), + zap.String("changefeed", e.changefeed.ID), + zap.Int("workerNum", e.workerNum)) return eg.Wait() } diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 0816f2b0d4c..d566ddf66c5 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -156,7 +156,10 @@ func (f *fileWorkerGroup) Run( defer func() { f.close() if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo file workers closed with error", zap.Error(err)) + log.Warn("redo file workers closed with error", + zap.String("namespace", f.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", f.cfg.ChangeFeedID.ID), + zap.Error(err)) } }() @@ -169,7 +172,10 @@ func (f *fileWorkerGroup) Run( return f.bgFlushFileCache(egCtx) }) } - log.Info("redo file workers started", zap.Int("workerNum", f.workerNum)) + log.Info("redo file workers started", + zap.String("namespace", f.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", f.cfg.ChangeFeedID.ID), + zap.Int("workerNum", f.workerNum)) return eg.Wait() } diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index 25e968c513f..32b5076ed6e 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -62,7 +62,7 @@ func NewLogWriter( cancel: lwCancel, } - lw.encodeWorkers = newEncodingWorkerGroup(cfg.EncodingWorkerNum) + lw.encodeWorkers = newEncodingWorkerGroup(cfg) eg.Go(func() error { return lw.encodeWorkers.Run(lwCtx) }) @@ -78,6 +78,8 @@ func (l *memoryLogWriter) WriteEvents(ctx context.Context, events ...writer.Redo for _, event := range events { if event == nil { log.Warn("writing nil event to redo log, ignore this", + zap.String("namespace", l.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", l.cfg.ChangeFeedID.ID), zap.String("capture", l.cfg.CaptureID)) continue } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 447d1a00710..eae53d423de 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -170,7 +170,9 @@ func NewMySQLBackends( var maxAllowedPacket int64 maxAllowedPacket, err = pmysql.QueryMaxAllowedPacket(ctx, db) if err != nil { - log.Warn("failed to query max_allowed_packet, use default value", zap.Error(err)) + log.Warn("failed to query max_allowed_packet, use default value", + zap.String("changefeed", changefeed), + zap.Error(err)) maxAllowedPacket = int64(variable.DefMaxAllowedPacket) } @@ -226,13 +228,13 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) { } dmls := s.prepareDMLs() - log.Debug("prepare DMLs", zap.Any("rows", s.rows), + log.Debug("prepare DMLs", zap.String("changefeed", s.changefeed), zap.Any("rows", s.rows), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) start := time.Now() if err := s.execDMLWithMaxRetries(ctx, dmls); err != nil { if errors.Cause(err) != context.Canceled { - log.Error("execute DMLs failed", zap.Error(err)) + log.Error("execute DMLs failed", zap.String("changefeed", s.changefeed), zap.Error(err)) } return errors.Trace(err) } @@ -551,6 +553,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // replicated before, and there is no such row in downstream MySQL. translateToInsert = translateToInsert && firstRow.CommitTs > firstRow.ReplicatingTs log.Debug("translate to insert", + zap.String("changefeed", s.changefeed), zap.Bool("translateToInsert", translateToInsert), zap.Uint64("firstRowCommitTs", firstRow.CommitTs), zap.Uint64("firstRowReplicatingTs", firstRow.ReplicatingTs), @@ -658,7 +661,7 @@ func (s *mysqlBackend) multiStmtExecute( } multiStmtSQL := strings.Join(dmls.sqls, ";") - log.Debug("exec row", zap.Int("workerID", s.workerID), + log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID), zap.String("sql", multiStmtSQL), zap.Any("args", multiStmtArgs)) ctx, cancel := context.WithTimeout(ctx, writeTimeout) defer cancel() @@ -670,7 +673,7 @@ func (s *mysqlBackend) multiStmtExecute( start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.Error(rbErr)) + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } return err @@ -685,7 +688,7 @@ func (s *mysqlBackend) sequenceExecute( start := time.Now() for i, query := range dmls.sqls { args := dmls.values[i] - log.Debug("exec row", zap.Int("workerID", s.workerID), + log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID), zap.String("sql", query), zap.Any("args", args)) ctx, cancelFunc := context.WithTimeout(ctx, writeTimeout) @@ -717,7 +720,7 @@ func (s *mysqlBackend) sequenceExecute( start, s.changefeed, query, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.Error(rbErr)) + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } cancelFunc() @@ -731,6 +734,7 @@ func (s *mysqlBackend) sequenceExecute( func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDMLs) error { if len(dmls.sqls) != len(dmls.values) { log.Panic("unexpected number of sqls and values", + zap.String("changefeed", s.changefeed), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) } @@ -787,7 +791,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.Error(rbErr)) + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } return 0, err @@ -804,8 +808,8 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare return errors.Trace(err) } log.Debug("Exec Rows succeeded", - zap.Int("workerID", s.workerID), zap.String("changefeed", s.changefeed), + zap.Int("workerID", s.workerID), zap.Int("numOfRows", dmls.rowCount)) return nil }, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),