Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mereics(ticdc): add cdc_resolvedts_high_delay alert #10078

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ func (p *processor) updateBarrierTs(barrier *schedulepb.Barrier) {
globalBarrierTs = schemaResolvedTs
}
log.Debug("update barrierTs",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Any("tableBarriers", barrier.GetTableBarriers()),
zap.Uint64("globalBarrierTs", globalBarrierTs))

Expand All @@ -773,7 +775,10 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str
retry.WithIsRetryableErr(cerror.IsRetryableError))

if tableName == nil {
log.Warn("failed to get table name for metric", zap.Any("tableID", tableID))
log.Warn("failed to get table name for metric",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Any("tableID", tableID))
return strconv.Itoa(int(tableID))
}

Expand Down Expand Up @@ -861,7 +866,10 @@ func (p *processor) Close() error {
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))
if err := p.agent.Close(); err != nil {
log.Warn("close agent meet error", zap.Error(err))
log.Warn("close agent meet error",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Error(err))
}
log.Info("Processor closed agent successfully",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
23 changes: 17 additions & 6 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,21 @@ func (m *logManager) GetResolvedTs(span tablepb.Span) model.Ts {
func (m *logManager) AddTable(span tablepb.Span, startTs uint64) {
_, loaded := m.rtsMap.LoadOrStore(span, &statefulRts{flushed: startTs, unflushed: startTs})
if loaded {
log.Warn("add duplicated table in redo log manager", zap.Stringer("span", &span))
log.Warn("add duplicated table in redo log manager",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Stringer("span", &span))
return
}
}

// RemoveTable removes a table from redo log manager
func (m *logManager) RemoveTable(span tablepb.Span) {
if _, ok := m.rtsMap.LoadAndDelete(span); !ok {
log.Warn("remove a table not maintained in redo log manager", zap.Stringer("span", &span))
log.Warn("remove a table not maintained in redo log manager",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Stringer("span", &span))
return
}
}
Expand All @@ -398,6 +404,8 @@ func (m *logManager) postFlush(tableRtsMap *spanz.HashMap[model.Ts]) {
changed := value.(*statefulRts).checkAndSetFlushed(flushed)
if !changed {
log.Debug("flush redo with regressed resolved ts",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("flushed", flushed),
zap.Uint64("current", value.(*statefulRts).getFlushed()))
Expand All @@ -415,12 +423,15 @@ func (m *logManager) flushLog(
*workTimeSlice += time.Since(start)
}()
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
log.Debug("Fail to update flush flag, " +
"the previous flush operation hasn't finished yet")
log.Debug("Fail to update flush flag, "+
"the previous flush operation hasn't finished yet",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID))
if time.Since(m.lastFlushTime) > redo.FlushWarnDuration {
log.Warn("flushLog blocking too long, the redo manager may be stuck",
zap.Duration("duration", time.Since(m.lastFlushTime)),
zap.Any("changfeed", m.cfg.ChangeFeedID))
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(m.lastFlushTime)))
}
return
}
Expand Down
16 changes: 14 additions & 2 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func NewMetaManager(

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
Expand Down Expand Up @@ -260,6 +262,8 @@ func (m *metaManager) initMeta(ctx context.Context) error {
common.ParseMeta(metas, &checkpointTs, &resolvedTs)
if checkpointTs == 0 || resolvedTs == 0 {
log.Panic("checkpointTs or resolvedTs is 0 when initializing redo meta in owner",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
Expand Down Expand Up @@ -321,11 +325,17 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool {

commitTs, fileType, err := redo.ParseLogFileName(path)
if err != nil {
log.Error("parse file name failed", zap.String("path", path), zap.Error(err))
log.Error("parse file name failed",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.String("path", path), zap.Error(err))
return false
}
if fileType != redo.RedoDDLLogFileType && fileType != redo.RedoRowLogFileType {
log.Panic("unknown file type", zap.String("path", path), zap.Any("fileType", fileType))
log.Panic("unknown file type",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.String("path", path), zap.Any("fileType", fileType))
}

// if commitTs == checkPointTs, the DDL may be executed in the owner,
Expand Down Expand Up @@ -447,6 +457,8 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error {
m.preMetaFile = metaFile

log.Debug("flush meta to s3",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.String("metaFile", metaFile),
zap.Any("cost", time.Since(start).Milliseconds()))
m.metricFlushLogDuration.Observe(time.Since(start).Seconds())
Expand Down
23 changes: 16 additions & 7 deletions cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (e *polymorphicRedoEvent) encode() (err error) {
}

type encodingWorkerGroup struct {
changefeed model.ChangeFeedID
outputCh chan *polymorphicRedoEvent
inputChs []chan *polymorphicRedoEvent
workerNum int
Expand All @@ -106,7 +107,8 @@ type encodingWorkerGroup struct {
closed chan struct{}
}

func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup {
func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
workerNum := cfg.EncodingWorkerNum
if workerNum <= 0 {
workerNum = redo.DefaultEncodingWorkerNum
}
Expand All @@ -115,18 +117,22 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup {
inputChs[i] = make(chan *polymorphicRedoEvent, redo.DefaultEncodingInputChanSize)
}
return &encodingWorkerGroup{
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
changefeed: cfg.ChangeFeedID,
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
}
}

func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
defer func() {
close(e.closed)
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo fileWorkerGroup closed with error", zap.Error(err))
log.Warn("redo fileWorkerGroup closed with error",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
}
}()
eg, egCtx := errgroup.WithContext(ctx)
Expand All @@ -136,7 +142,10 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
return e.runWorker(egCtx, idx)
})
}
log.Info("redo log encoding workers started", zap.Int("workerNum", e.workerNum))
log.Info("redo log encoding workers started",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Int("workerNum", e.workerNum))
return eg.Wait()
}

Expand Down
10 changes: 8 additions & 2 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ func (f *fileWorkerGroup) Run(
defer func() {
f.close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo file workers closed with error", zap.Error(err))
log.Warn("redo file workers closed with error",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}()

Expand All @@ -152,7 +155,10 @@ func (f *fileWorkerGroup) Run(
return f.bgFlushFileCache(egCtx)
})
}
log.Info("redo file workers started", zap.Int("workerNum", f.workerNum))
log.Info("redo file workers started",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Int("workerNum", f.workerNum))
return eg.Wait()
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/redo/writer/memory/mem_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewLogWriter(
cancel: lwCancel,
}

lw.encodeWorkers = newEncodingWorkerGroup(cfg.EncodingWorkerNum)
lw.encodeWorkers = newEncodingWorkerGroup(cfg)
eg.Go(func() error {
return lw.encodeWorkers.Run(lwCtx)
})
Expand All @@ -79,6 +79,8 @@ func (l *memoryLogWriter) WriteEvents(ctx context.Context, events ...writer.Redo
for _, event := range events {
if event == nil {
log.Warn("writing nil event to redo log, ignore this",
zap.String("namespace", l.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", l.cfg.ChangeFeedID.ID),
zap.String("capture", l.cfg.CaptureID))
continue
}
Expand Down
22 changes: 13 additions & 9 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func NewMySQLBackends(
var maxAllowedPacket int64
maxAllowedPacket, err = pmysql.QueryMaxAllowedPacket(ctx, db)
if err != nil {
log.Warn("failed to query max_allowed_packet, use default value", zap.Error(err))
log.Warn("failed to query max_allowed_packet, use default value",
zap.String("changefeed", changefeed),
zap.Error(err))
maxAllowedPacket = int64(variable.DefMaxAllowedPacket)
}

Expand Down Expand Up @@ -226,13 +228,13 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) {
}

dmls := s.prepareDMLs()
log.Debug("prepare DMLs", zap.Any("rows", s.rows),
log.Debug("prepare DMLs", zap.String("changefeed", s.changefeed), zap.Any("rows", s.rows),
zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values))

start := time.Now()
if err := s.execDMLWithMaxRetries(ctx, dmls); err != nil {
if errors.Cause(err) != context.Canceled {
log.Error("execute DMLs failed", zap.Error(err))
log.Error("execute DMLs failed", zap.String("changefeed", s.changefeed), zap.Error(err))
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -551,6 +553,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
// replicated before, and there is no such row in downstream MySQL.
translateToInsert = translateToInsert && firstRow.CommitTs > firstRow.ReplicatingTs
log.Debug("translate to insert",
zap.String("changefeed", s.changefeed),
zap.Bool("translateToInsert", translateToInsert),
zap.Uint64("firstRowCommitTs", firstRow.CommitTs),
zap.Uint64("firstRowReplicatingTs", firstRow.ReplicatingTs),
Expand Down Expand Up @@ -658,7 +661,7 @@ func (s *mysqlBackend) multiStmtExecute(
}
multiStmtSQL := strings.Join(dmls.sqls, ";")

log.Debug("exec row", zap.Int("workerID", s.workerID),
log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID),
zap.String("sql", multiStmtSQL), zap.Any("args", multiStmtArgs))
ctx, cancel := context.WithTimeout(ctx, writeTimeout)
defer cancel()
Expand All @@ -670,7 +673,7 @@ func (s *mysqlBackend) multiStmtExecute(
start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.Error(rbErr))
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
return err
Expand All @@ -685,7 +688,7 @@ func (s *mysqlBackend) sequenceExecute(
start := time.Now()
for i, query := range dmls.sqls {
args := dmls.values[i]
log.Debug("exec row", zap.Int("workerID", s.workerID),
log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID),
zap.String("sql", query), zap.Any("args", args))
ctx, cancelFunc := context.WithTimeout(ctx, writeTimeout)

Expand Down Expand Up @@ -717,7 +720,7 @@ func (s *mysqlBackend) sequenceExecute(
start, s.changefeed, query, dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.Error(rbErr))
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
cancelFunc()
Expand All @@ -731,6 +734,7 @@ func (s *mysqlBackend) sequenceExecute(
func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDMLs) error {
if len(dmls.sqls) != len(dmls.values) {
log.Panic("unexpected number of sqls and values",
zap.String("changefeed", s.changefeed),
zap.Strings("sqls", dmls.sqls),
zap.Any("values", dmls.values))
}
Expand Down Expand Up @@ -787,7 +791,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.Error(rbErr))
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
return 0, err
Expand All @@ -804,8 +808,8 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
return errors.Trace(err)
}
log.Debug("Exec Rows succeeded",
zap.Int("workerID", s.workerID),
zap.String("changefeed", s.changefeed),
zap.Int("workerID", s.workerID),
zap.Int("numOfRows", dmls.rowCount))
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
Expand Down
12 changes: 12 additions & 0 deletions metrics/alertmanager/ticdc.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ groups:
value: '{{ $value }}'
summary: cdc owner checkpoint delay more than 10 minutes

- alert: cdc_resolvedts_high_delay
expr: ticdc_owner_resolved_ts_lag > 300
for: 1m
labels:
env: ENV_LABELS_ENV
level: critical
expr: ticdc_owner_resolved_ts_lag > 300
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
summary: cdc owner resolved ts delay more than 5 minutes

- alert: ticdc_sink_execution_error
expr: changes(ticdc_sink_execution_error[1m]) > 0
for: 1m
Expand Down
Loading