Skip to content

Commit

Permalink
sink(ticdc): add total write bytes counter to sink (pingcap#10040) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 7, 2024
1 parent d9077ca commit 3fcb14a
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 18 deletions.
10 changes: 6 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,25 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
var callbacks []func()
buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
bytesCnt := int64(0)
for _, msg := range task.msgs {
d.metricWriteBytes.Add(float64(len(msg.Value)))
bytesCnt += int64(len(msg.Value))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
callbacks = append(callbacks, msg.Callback)
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
if err := d.statistics.RecordBatchExecution(func() (int, int64, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
return 0, 0, err
}
return rowsCnt, nil
return rowsCnt, bytesCnt, nil
}); err != nil {
return err
}

d.metricWriteBytes.Add(float64(bytesCnt))
d.metricFileCount.Add(1)
for _, cb := range callbacks {
if cb != nil {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,16 @@ func (w *worker) sendMessages(ctx context.Context) error {
}
for _, message := range future.Messages {
start := time.Now()
if err = w.statistics.RecordBatchExecution(func() (int, error) {
if err = w.statistics.RecordBatchExecution(func() (int, int64, error) {
message.SetPartitionKey(future.PartitionKey)
if err := w.producer.AsyncSendMessage(
ctx,
future.Topic,
future.Partition,
message); err != nil {
return 0, err
return 0, 0, err
}
return message.GetRowsCount(), nil
return message.GetRowsCount(), int64(message.Length()), nil
}); err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,10 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
})
failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) })

err := s.statistics.RecordBatchExecution(func() (int, error) {
err := s.statistics.RecordBatchExecution(func() (int, int64, error) {
tx, err := s.db.BeginTx(pctx, nil)
if err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs)
}
Expand All @@ -767,12 +767,12 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
err = s.multiStmtExecute(pctx, dmls, tx, writeTimeout)
if err != nil {
fallbackToSeqWay = true
return 0, err
return 0, 0, err
}
} else {
err = s.sequenceExecute(pctx, dmls, tx, writeTimeout)
if err != nil {
return 0, err
return 0, 0, err
}
}

Expand All @@ -790,15 +790,15 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
log.Warn("failed to rollback txn", zap.Error(rbErr))
}
}
return 0, err
return 0, 0, err
}

if err = tx.Commit(); err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs)
}
return dmls.rowCount, nil
return dmls.rowCount, dmls.approximateSize, nil
})
if err != nil {
return errors.Trace(err)
Expand Down
10 changes: 10 additions & 0 deletions cdc/sink/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ var (
Buckets: prometheus.ExponentialBuckets(1, 2, 18),
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// ExecWriteBytesGauge records the total number of bytes written by sink.
TotalWriteBytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "write_bytes_total",
Help: "Total number of bytes written by sink",
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// LargeRowSizeHistogram records size of large rows.
LargeRowSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -69,6 +78,7 @@ var (
// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ExecBatchHistogram)
registry.MustRegister(TotalWriteBytesCounter)
registry.MustRegister(ExecDDLHistogram)
registry.MustRegister(LargeRowSizeHistogram)
registry.MustRegister(ExecutionErrorCounter)
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/metrics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewStatistics(ctx context.Context,
s := sinkType.String()
statistics.metricExecDDLHis = ExecDDLHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricExecBatchHis = ExecBatchHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.WithLabelValues(namespcae, changefeedID, s)
statistics.metricRowSizeHis = LargeRowSizeHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricExecErrCnt = ExecutionErrorCounter.WithLabelValues(namespcae, changefeedID, s)
return statistics
Expand All @@ -55,6 +56,8 @@ type Statistics struct {
metricExecDDLHis prometheus.Observer
// Histogram for DML batch size.
metricExecBatchHis prometheus.Observer
// Counter for total bytes of DML.
metricTotalWriteBytesCnt prometheus.Counter
// Histogram for Row size.
metricRowSizeHis prometheus.Observer
// Counter for sink error.
Expand All @@ -73,13 +76,14 @@ func (b *Statistics) ObserveRows(rows ...*model.RowChangedEvent) {
}

// RecordBatchExecution stats batch executors which return (batchRowCount, error).
func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error {
batchSize, err := executor()
func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error {
batchSize, batchWriteBytes, err := executor()
if err != nil {
b.metricExecErrCnt.Inc()
return err
}
b.metricExecBatchHis.Observe(float64(batchSize))
b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/avro_basic/conf/changefeed.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[integrity]
integrity-check-level = "correctness"
integrity-check-level = "none"
corruption-handle-level = "warn"
2 changes: 1 addition & 1 deletion tests/integration_tests/avro_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function run() {
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "NONE"}' http://127.0.0.1:8088/config

# upstream tidb cluster enable row level checksum
run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

TOPIC_NAME="ticdc-avro-test"
# record tso before we create tables to skip the system table DDLs
Expand Down

0 comments on commit 3fcb14a

Please sign in to comment.