Skip to content

Commit

Permalink
add changefeed log
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Dec 14, 2023
1 parent aca6d4d commit 89cdc3c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 29 deletions.
12 changes: 10 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ func (p *processor) updateBarrierTs(barrier *schedulepb.Barrier) {
globalBarrierTs = schemaResolvedTs
}
log.Debug("update barrierTs",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Any("tableBarriers", barrier.GetTableBarriers()),
zap.Uint64("globalBarrierTs", globalBarrierTs))

Expand All @@ -770,7 +772,10 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str
retry.WithIsRetryableErr(cerror.IsRetryableError))

if tableName == nil {
log.Warn("failed to get table name for metric", zap.Any("tableID", tableID))
log.Warn("failed to get table name for metric",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Any("tableID", tableID))
return strconv.Itoa(int(tableID))
}

Expand Down Expand Up @@ -858,7 +863,10 @@ func (p *processor) Close() error {
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))
if err := p.agent.Close(); err != nil {
log.Warn("close agent meet error", zap.Error(err))
log.Warn("close agent meet error",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Error(err))
}
log.Info("Processor closed agent successfully",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
23 changes: 17 additions & 6 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,21 @@ func (m *logManager) GetResolvedTs(span tablepb.Span) model.Ts {
func (m *logManager) AddTable(span tablepb.Span, startTs uint64) {
_, loaded := m.rtsMap.LoadOrStore(span, &statefulRts{flushed: startTs, unflushed: startTs})
if loaded {
log.Warn("add duplicated table in redo log manager", zap.Stringer("span", &span))
log.Warn("add duplicated table in redo log manager",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Stringer("span", &span))
return
}
}

// RemoveTable removes a table from redo log manager
func (m *logManager) RemoveTable(span tablepb.Span) {
if _, ok := m.rtsMap.LoadAndDelete(span); !ok {
log.Warn("remove a table not maintained in redo log manager", zap.Stringer("span", &span))
log.Warn("remove a table not maintained in redo log manager",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Stringer("span", &span))
return
}
}
Expand All @@ -408,6 +414,8 @@ func (m *logManager) postFlush(tableRtsMap *spanz.HashMap[model.Ts]) {
changed := value.(*statefulRts).checkAndSetFlushed(flushed)
if !changed {
log.Debug("flush redo with regressed resolved ts",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("flushed", flushed),
zap.Uint64("current", value.(*statefulRts).getFlushed()))
Expand All @@ -425,12 +433,15 @@ func (m *logManager) flushLog(
*workTimeSlice += time.Since(start)
}()
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
log.Debug("Fail to update flush flag, " +
"the previous flush operation hasn't finished yet")
log.Debug("Fail to update flush flag, "+
"the previous flush operation hasn't finished yet",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID))
if time.Since(m.lastFlushTime) > redo.FlushWarnDuration {
log.Warn("flushLog blocking too long, the redo manager may be stuck",
zap.Duration("duration", time.Since(m.lastFlushTime)),
zap.Any("changfeed", m.cfg.ChangeFeedID))
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(m.lastFlushTime)))
}
return
}
Expand Down
16 changes: 14 additions & 2 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func NewMetaManager(

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
Expand Down Expand Up @@ -264,6 +266,8 @@ func (m *metaManager) initMeta(ctx context.Context) error {
common.ParseMeta(metas, &checkpointTs, &resolvedTs)
if checkpointTs == 0 || resolvedTs == 0 {
log.Panic("checkpointTs or resolvedTs is 0 when initializing redo meta in owner",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
Expand Down Expand Up @@ -325,11 +329,17 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool {

commitTs, fileType, err := redo.ParseLogFileName(path)
if err != nil {
log.Error("parse file name failed", zap.String("path", path), zap.Error(err))
log.Error("parse file name failed",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.String("path", path), zap.Error(err))
return false
}
if fileType != redo.RedoDDLLogFileType && fileType != redo.RedoRowLogFileType {
log.Panic("unknown file type", zap.String("path", path), zap.Any("fileType", fileType))
log.Panic("unknown file type",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.String("path", path), zap.Any("fileType", fileType))
}

// if commitTs == checkPointTs, the DDL may be executed in the owner,
Expand Down Expand Up @@ -451,6 +461,8 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error {
m.preMetaFile = metaFile

log.Debug("flush meta to s3",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.String("metaFile", metaFile),
zap.Any("cost", time.Since(start).Milliseconds()))
m.metricFlushLogDuration.Observe(time.Since(start).Seconds())
Expand Down
23 changes: 16 additions & 7 deletions cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (e *polymorphicRedoEvent) encode() (err error) {
}

type encodingWorkerGroup struct {
changefeed model.ChangeFeedID
outputCh chan *polymorphicRedoEvent
inputChs []chan *polymorphicRedoEvent
workerNum int
Expand All @@ -106,7 +107,8 @@ type encodingWorkerGroup struct {
closed chan struct{}
}

func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup {
func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
workerNum := cfg.EncodingWorkerNum
if workerNum <= 0 {
workerNum = redo.DefaultEncodingWorkerNum
}
Expand All @@ -115,18 +117,22 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup {
inputChs[i] = make(chan *polymorphicRedoEvent, redo.DefaultEncodingInputChanSize)
}
return &encodingWorkerGroup{
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
changefeed: cfg.ChangeFeedID,
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
}
}

func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
defer func() {
close(e.closed)
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo fileWorkerGroup closed with error", zap.Error(err))
log.Warn("redo fileWorkerGroup closed with error",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
}
}()
eg, egCtx := errgroup.WithContext(ctx)
Expand All @@ -136,7 +142,10 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
return e.runWorker(egCtx, idx)
})
}
log.Info("redo log encoding workers started", zap.Int("workerNum", e.workerNum))
log.Info("redo log encoding workers started",
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Int("workerNum", e.workerNum))
return eg.Wait()
}

Expand Down
10 changes: 8 additions & 2 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func (f *fileWorkerGroup) Run(
defer func() {
f.close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("redo file workers closed with error", zap.Error(err))
log.Warn("redo file workers closed with error",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}()

Expand All @@ -169,7 +172,10 @@ func (f *fileWorkerGroup) Run(
return f.bgFlushFileCache(egCtx)
})
}
log.Info("redo file workers started", zap.Int("workerNum", f.workerNum))
log.Info("redo file workers started",
zap.String("namespace", f.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", f.cfg.ChangeFeedID.ID),
zap.Int("workerNum", f.workerNum))
return eg.Wait()
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/redo/writer/memory/mem_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewLogWriter(
cancel: lwCancel,
}

lw.encodeWorkers = newEncodingWorkerGroup(cfg.EncodingWorkerNum)
lw.encodeWorkers = newEncodingWorkerGroup(cfg)
eg.Go(func() error {
return lw.encodeWorkers.Run(lwCtx)
})
Expand All @@ -78,6 +78,8 @@ func (l *memoryLogWriter) WriteEvents(ctx context.Context, events ...writer.Redo
for _, event := range events {
if event == nil {
log.Warn("writing nil event to redo log, ignore this",
zap.String("namespace", l.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", l.cfg.ChangeFeedID.ID),
zap.String("capture", l.cfg.CaptureID))
continue
}
Expand Down
22 changes: 13 additions & 9 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func NewMySQLBackends(
var maxAllowedPacket int64
maxAllowedPacket, err = pmysql.QueryMaxAllowedPacket(ctx, db)
if err != nil {
log.Warn("failed to query max_allowed_packet, use default value", zap.Error(err))
log.Warn("failed to query max_allowed_packet, use default value",
zap.String("changefeed", changefeed),
zap.Error(err))
maxAllowedPacket = int64(variable.DefMaxAllowedPacket)
}

Expand Down Expand Up @@ -226,13 +228,13 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) {
}

dmls := s.prepareDMLs()
log.Debug("prepare DMLs", zap.Any("rows", s.rows),
log.Debug("prepare DMLs", zap.String("changefeed", s.changefeed), zap.Any("rows", s.rows),
zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values))

start := time.Now()
if err := s.execDMLWithMaxRetries(ctx, dmls); err != nil {
if errors.Cause(err) != context.Canceled {
log.Error("execute DMLs failed", zap.Error(err))
log.Error("execute DMLs failed", zap.String("changefeed", s.changefeed), zap.Error(err))
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -551,6 +553,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
// replicated before, and there is no such row in downstream MySQL.
translateToInsert = translateToInsert && firstRow.CommitTs > firstRow.ReplicatingTs
log.Debug("translate to insert",
zap.String("changefeed", s.changefeed),
zap.Bool("translateToInsert", translateToInsert),
zap.Uint64("firstRowCommitTs", firstRow.CommitTs),
zap.Uint64("firstRowReplicatingTs", firstRow.ReplicatingTs),
Expand Down Expand Up @@ -658,7 +661,7 @@ func (s *mysqlBackend) multiStmtExecute(
}
multiStmtSQL := strings.Join(dmls.sqls, ";")

log.Debug("exec row", zap.Int("workerID", s.workerID),
log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID),
zap.String("sql", multiStmtSQL), zap.Any("args", multiStmtArgs))
ctx, cancel := context.WithTimeout(ctx, writeTimeout)
defer cancel()
Expand All @@ -670,7 +673,7 @@ func (s *mysqlBackend) multiStmtExecute(
start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.Error(rbErr))
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
return err
Expand All @@ -685,7 +688,7 @@ func (s *mysqlBackend) sequenceExecute(
start := time.Now()
for i, query := range dmls.sqls {
args := dmls.values[i]
log.Debug("exec row", zap.Int("workerID", s.workerID),
log.Debug("exec row", zap.String("changefeed", s.changefeed), zap.Int("workerID", s.workerID),
zap.String("sql", query), zap.Any("args", args))
ctx, cancelFunc := context.WithTimeout(ctx, writeTimeout)

Expand Down Expand Up @@ -717,7 +720,7 @@ func (s *mysqlBackend) sequenceExecute(
start, s.changefeed, query, dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.Error(rbErr))
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
cancelFunc()
Expand All @@ -731,6 +734,7 @@ func (s *mysqlBackend) sequenceExecute(
func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDMLs) error {
if len(dmls.sqls) != len(dmls.values) {
log.Panic("unexpected number of sqls and values",
zap.String("changefeed", s.changefeed),
zap.Strings("sqls", dmls.sqls),
zap.Any("values", dmls.values))
}
Expand Down Expand Up @@ -787,7 +791,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.Error(rbErr))
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
return 0, err
Expand All @@ -804,8 +808,8 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
return errors.Trace(err)
}
log.Debug("Exec Rows succeeded",
zap.Int("workerID", s.workerID),
zap.String("changefeed", s.changefeed),
zap.Int("workerID", s.workerID),
zap.Int("numOfRows", dmls.rowCount))
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
Expand Down

0 comments on commit 89cdc3c

Please sign in to comment.