diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index acfe4aa5129..acc259d17d9 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -1047,6 +1047,8 @@ func (p *processor) updateBarrierTs(barrier *schedulepb.Barrier) { globalBarrierTs = schemaResolvedTs } log.Debug("update barrierTs", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), zap.Any("tableBarriers", barrier.GetTableBarriers()), zap.Uint64("globalBarrierTs", globalBarrierTs)) if p.pullBasedSinking { @@ -1078,7 +1080,10 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str retry.WithIsRetryableErr(cerror.IsRetryableError)) if tableName == nil { - log.Warn("failed to get table name for metric", zap.Any("tableID", tableID)) + log.Warn("failed to get table name for metric", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.Any("tableID", tableID)) return strconv.Itoa(int(tableID)) } @@ -1303,7 +1308,10 @@ func (p *processor) Close(ctx cdcContext.Context) error { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) if err := p.agent.Close(); err != nil { - log.Warn("close agent meet error", zap.Error(err)) + log.Warn("close agent meet error", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.Error(err)) } log.Info("Processor closed agent successfully", zap.String("namespace", p.changefeedID.Namespace), diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index e07fbdd2288..13d372095b5 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -362,7 +362,10 @@ func (m *logManager) GetResolvedTs(tableID model.TableID) model.Ts { func (m *logManager) AddTable(tableID model.TableID, startTs uint64) { _, loaded := m.rtsMap.LoadOrStore(tableID, &statefulRts{flushed: startTs, unflushed: startTs}) if loaded { - log.Warn("add duplicated table in redo log manager", zap.Int64("tableID", tableID)) + log.Warn("add duplicated table in redo log manager", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Int64("tableID", tableID)) return } } @@ -370,7 +373,10 @@ func (m *logManager) AddTable(tableID model.TableID, startTs uint64) { // RemoveTable removes a table from redo log manager func (m *logManager) RemoveTable(tableID model.TableID) { if _, ok := m.rtsMap.LoadAndDelete(tableID); !ok { - log.Warn("remove a table not maintained in redo log manager", zap.Int64("tableID", tableID)) + log.Warn("remove a table not maintained in redo log manager", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Int64("tableID", tableID)) return } } @@ -397,6 +403,8 @@ func (m *logManager) postFlush(tableRtsMap map[model.TableID]model.Ts) { changed := value.(*statefulRts).checkAndSetFlushed(flushed) if !changed { log.Debug("flush redo with regressed resolved ts", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), zap.Int64("tableID", tableID), zap.Uint64("flushed", flushed), zap.Uint64("current", value.(*statefulRts).getFlushed())) @@ -413,12 +421,15 @@ func (m *logManager) flushLog( *workTimeSlice += time.Since(start) }() if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { - log.Debug("Fail to update flush flag, " + - "the previous flush operation hasn't finished yet") + log.Debug("Fail to update flush flag, "+ + "the previous flush operation hasn't finished yet", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID)) if time.Since(m.lastFlushTime) > redo.FlushWarnDuration { log.Warn("flushLog blocking too long, the redo manager may be stuck", - zap.Duration("duration", time.Since(m.lastFlushTime)), - zap.Any("changfeed", m.cfg.ChangeFeedID)) + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Duration("duration", time.Since(m.lastFlushTime))) } return } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 7c31403b175..d4d6d034a80 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -105,6 +105,8 @@ func NewMetaManager( if m.flushIntervalInMs < redo.MinFlushIntervalInMs { log.Warn("redo flush interval is too small, use default value", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), zap.Int64("interval", m.flushIntervalInMs)) m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs } @@ -259,6 +261,8 @@ func (m *metaManager) initMeta(ctx context.Context) error { common.ParseMeta(metas, &checkpointTs, &resolvedTs) if checkpointTs == 0 || resolvedTs == 0 { log.Panic("checkpointTs or resolvedTs is 0 when initializing redo meta in owner", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", resolvedTs)) } @@ -319,11 +323,17 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool { commitTs, fileType, err := redo.ParseLogFileName(path) if err != nil { - log.Error("parse file name failed", zap.String("path", path), zap.Error(err)) + log.Error("parse file name failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Error(err)) return false } if fileType != redo.RedoDDLLogFileType && fileType != redo.RedoRowLogFileType { - log.Panic("unknown file type", zap.String("path", path), zap.Any("fileType", fileType)) + log.Panic("unknown file type", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Any("fileType", fileType)) } // if commitTs == checkPointTs, the DDL may be executed in the owner, @@ -445,6 +455,8 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { m.preMetaFile = metaFile log.Debug("flush meta to s3", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), zap.String("metaFile", metaFile), zap.Any("cost", time.Since(start).Milliseconds())) m.metricFlushLogDuration.Observe(time.Since(start).Seconds()) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 7abaeed41ce..883ebb13314 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -97,6 +97,7 @@ func (e *polymorphicRedoEvent) encode() (err error) { } type encodingWorkerGroup struct { + changefeed model.ChangeFeedID outputCh chan *polymorphicRedoEvent inputChs []chan *polymorphicRedoEvent workerNum int @@ -105,7 +106,8 @@ type encodingWorkerGroup struct { closed chan struct{} } -func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { +func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup { + workerNum := cfg.EncodingWorkerNum if workerNum <= 0 { workerNum = redo.DefaultEncodingWorkerNum } @@ -114,10 +116,11 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { inputChs[i] = make(chan *polymorphicRedoEvent, redo.DefaultEncodingInputChanSize) } return &encodingWorkerGroup{ - inputChs: inputChs, - outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize), - workerNum: workerNum, - closed: make(chan struct{}), + changefeed: cfg.ChangeFeedID, + inputChs: inputChs, + outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize), + workerNum: workerNum, + closed: make(chan struct{}), } } @@ -125,7 +128,10 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { close(e.closed) if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo fileWorkerGroup closed with error", zap.Error(err)) + log.Warn("redo fileWorkerGroup closed with error", + zap.String("namespace", e.changefeed.Namespace), + zap.String("changefeed", e.changefeed.ID), + zap.Error(err)) } }() eg, egCtx := errgroup.WithContext(ctx) @@ -135,7 +141,10 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { return e.runWorker(egCtx, idx) }) } - log.Info("redo log encoding workers started", zap.Int("workerNum", e.workerNum)) + log.Info("redo log encoding workers started", + zap.String("namespace", e.changefeed.Namespace), + zap.String("changefeed", e.changefeed.ID), + zap.Int("workerNum", e.workerNum)) return eg.Wait() } diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 0816f2b0d4c..d566ddf66c5 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -156,7 +156,10 @@ func (f *fileWorkerGroup) Run( defer func() { f.close() if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo file workers closed with error", zap.Error(err)) + log.Warn("redo file workers closed with error", + zap.String("namespace", f.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", f.cfg.ChangeFeedID.ID), + zap.Error(err)) } }() @@ -169,7 +172,10 @@ func (f *fileWorkerGroup) Run( return f.bgFlushFileCache(egCtx) }) } - log.Info("redo file workers started", zap.Int("workerNum", f.workerNum)) + log.Info("redo file workers started", + zap.String("namespace", f.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", f.cfg.ChangeFeedID.ID), + zap.Int("workerNum", f.workerNum)) return eg.Wait() } diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index 25e968c513f..32b5076ed6e 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -62,7 +62,7 @@ func NewLogWriter( cancel: lwCancel, } - lw.encodeWorkers = newEncodingWorkerGroup(cfg.EncodingWorkerNum) + lw.encodeWorkers = newEncodingWorkerGroup(cfg) eg.Go(func() error { return lw.encodeWorkers.Run(lwCtx) }) @@ -78,6 +78,8 @@ func (l *memoryLogWriter) WriteEvents(ctx context.Context, events ...writer.Redo for _, event := range events { if event == nil { log.Warn("writing nil event to redo log, ignore this", + zap.String("namespace", l.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", l.cfg.ChangeFeedID.ID), zap.String("capture", l.cfg.CaptureID)) continue } diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index f7c6b19b731..539aafb92db 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -115,7 +115,9 @@ func NewMySQLBackends( var maxAllowedPacket int64 maxAllowedPacket, err = pmysql.QueryMaxAllowedPacket(ctx, db) if err != nil { - log.Warn("failed to query max_allowed_packet, use default value", zap.Error(err)) + log.Warn("failed to query max_allowed_packet, use default value", + zap.String("changefeed", changefeed), + zap.Error(err)) maxAllowedPacket = int64(variable.DefMaxAllowedPacket) } @@ -170,13 +172,13 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) { } dmls := s.prepareDMLs() - log.Debug("prepare DMLs", zap.Any("rows", s.rows), + log.Debug("prepare DMLs", zap.String("changefeed", s.changefeed), zap.Any("rows", s.rows), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) start := time.Now() if err := s.execDMLWithMaxRetries(ctx, dmls); err != nil { if errors.Cause(err) != context.Canceled { - log.Error("execute DMLs failed", zap.Error(err)) + log.Error("execute DMLs failed", zap.String("changefeed", s.changefeed), zap.Error(err)) } return errors.Trace(err) } @@ -505,6 +507,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // replicated before, and there is no such row in downstream MySQL. translateToInsert = translateToInsert && firstRow.CommitTs > firstRow.ReplicatingTs log.Debug("translate to insert", + zap.String("changefeed", s.changefeed), zap.Bool("translateToInsert", translateToInsert), zap.Uint64("firstRowCommitTs", firstRow.CommitTs), zap.Uint64("firstRowReplicatingTs", firstRow.ReplicatingTs), @@ -623,7 +626,7 @@ func (s *mysqlBackend) multiStmtExecute( } multiStmtSQL := strings.Join(dmls.sqls, ";") - log.Debug("exec row", zap.Int("workerID", s.workerID), + log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID), zap.String("sql", multiStmtSQL), zap.Any("args", multiStmtArgs)) ctx, cancel := context.WithTimeout(ctx, writeTimeout) defer cancel() @@ -635,7 +638,7 @@ func (s *mysqlBackend) multiStmtExecute( start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.Error(rbErr)) + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } return err @@ -650,7 +653,7 @@ func (s *mysqlBackend) sequenceExecute( start := time.Now() for i, query := range dmls.sqls { args := dmls.values[i] - log.Debug("exec row", zap.Int("workerID", s.workerID), + log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID), zap.String("sql", query), zap.Any("args", args)) ctx, cancelFunc := context.WithTimeout(ctx, writeTimeout) _, execError := tx.ExecContext(ctx, query, args...) @@ -660,7 +663,7 @@ func (s *mysqlBackend) sequenceExecute( start, s.changefeed, query, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.Error(rbErr)) + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } cancelFunc() @@ -674,6 +677,7 @@ func (s *mysqlBackend) sequenceExecute( func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDMLs) error { if len(dmls.sqls) != len(dmls.values) { log.Panic("unexpected number of sqls and values", + zap.String("changefeed", s.changefeed), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) } @@ -726,7 +730,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.Error(rbErr)) + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } return 0, err @@ -743,8 +747,8 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare return errors.Trace(err) } log.Debug("Exec Rows succeeded", - zap.Int("workerID", s.workerID), zap.String("changefeed", s.changefeed), + zap.Int("workerID", s.workerID), zap.Int("numOfRows", dmls.rowCount)) return nil }, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()), diff --git a/metrics/alertmanager/ticdc.rules.yml b/metrics/alertmanager/ticdc.rules.yml index 15166fd809b..15f4eb5fa45 100644 --- a/metrics/alertmanager/ticdc.rules.yml +++ b/metrics/alertmanager/ticdc.rules.yml @@ -37,6 +37,18 @@ groups: value: '{{ $value }}' summary: cdc processor checkpoint delay more than 10 minutes + - alert: cdc_resolvedts_high_delay + expr: ticdc_owner_resolved_ts_lag > 300 + for: 1m + labels: + env: ENV_LABELS_ENV + level: critical + expr: ticdc_owner_resolved_ts_lag > 300 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: cdc owner resolved ts delay more than 5 minutes + # changefeed related alter rules - alert: ticdc_changefeed_failed expr: (max_over_time(ticdc_owner_status[1m]) == 2) > 0