Skip to content

Commit

Permalink
sink-mysql(cdc) set variables first and then execute dmls
Browse files Browse the repository at this point in the history
Signed-off-by: zhangjinpeng1987 <[email protected]>
  • Loading branch information
zhangjinpeng87 committed Dec 18, 2023
1 parent 18c21b2 commit d1197ac
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,24 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs)
}

// Set session variables first and then execute the transaction.
// we try to set write source for each txn,
// so we can use it to trace the data source
if err = s.setWriteSource(pctx, tx); err != nil {
err := logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed,
fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source",
s.cfg.SourceID),
dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
return 0, 0, err
}

// If interplated SQL size exceeds maxAllowedPacket, mysql driver will
// fall back to the sequantial way.
// error can be ErrPrepareMulti, ErrBadConn etc.
Expand All @@ -780,23 +798,6 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
}
}

// we try to set write source for each txn,
// so we can use it to trace the data source
if err = s.setWriteSource(pctx, tx); err != nil {
err := logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed,
fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source",
s.cfg.SourceID),
dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr))
}
}
return 0, 0, err
}

if err = tx.Commit(); err != nil {
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
Expand Down

0 comments on commit d1197ac

Please sign in to comment.