diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 7af258f4d53..d7a1daf55d1 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -230,23 +230,25 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) rowsCnt := 0 + bytesCnt := int64(0) for _, msg := range task.msgs { - d.metricWriteBytes.Add(float64(len(msg.Value))) + bytesCnt += int64(len(msg.Value)) rowsCnt += msg.GetRowsCount() buf.Write(msg.Value) callbacks = append(callbacks, msg.Callback) } - if err := d.statistics.RecordBatchExecution(func() (int, error) { + if err := d.statistics.RecordBatchExecution(func() (int, int64, error) { err := d.storage.WriteFile(ctx, path, buf.Bytes()) if err != nil { - return 0, err + return 0, 0, err } - return rowsCnt, nil + return rowsCnt, bytesCnt, nil }); err != nil { return err } + d.metricWriteBytes.Add(float64(bytesCnt)) d.metricFileCount.Add(1) for _, cb := range callbacks { if cb != nil { diff --git a/cdc/sink/dmlsink/mq/worker.go b/cdc/sink/dmlsink/mq/worker.go index 5e525ba55c5..8f4ca155de7 100644 --- a/cdc/sink/dmlsink/mq/worker.go +++ b/cdc/sink/dmlsink/mq/worker.go @@ -304,16 +304,16 @@ func (w *worker) sendMessages(ctx context.Context) error { } for _, message := range future.Messages { start := time.Now() - if err = w.statistics.RecordBatchExecution(func() (int, error) { + if err = w.statistics.RecordBatchExecution(func() (int, int64, error) { message.SetPartitionKey(future.PartitionKey) if err := w.producer.AsyncSendMessage( ctx, future.Topic, future.Partition, message); err != nil { - return 0, err + return 0, 0, err } - return message.GetRowsCount(), nil + return message.GetRowsCount(), int64(message.Length()), nil }); err != nil { return err } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 447d1a00710..c0f3fe8e61d 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -750,10 +750,10 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare }) failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) }) - err := s.statistics.RecordBatchExecution(func() (int, error) { + err := s.statistics.RecordBatchExecution(func() (int, int64, error) { tx, err := s.db.BeginTx(pctx, nil) if err != nil { - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs) } @@ -767,12 +767,12 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare err = s.multiStmtExecute(pctx, dmls, tx, writeTimeout) if err != nil { fallbackToSeqWay = true - return 0, err + return 0, 0, err } } else { err = s.sequenceExecute(pctx, dmls, tx, writeTimeout) if err != nil { - return 0, err + return 0, 0, err } } @@ -790,15 +790,15 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare log.Warn("failed to rollback txn", zap.Error(rbErr)) } } - return 0, err + return 0, 0, err } if err = tx.Commit(); err != nil { - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs) } - return dmls.rowCount, nil + return dmls.rowCount, dmls.approximateSize, nil }) if err != nil { return errors.Trace(err) diff --git a/cdc/sink/metrics/metrics.go b/cdc/sink/metrics/metrics.go index 0fdf258e886..1f7cfa7868f 100644 --- a/cdc/sink/metrics/metrics.go +++ b/cdc/sink/metrics/metrics.go @@ -36,6 +36,15 @@ var ( Buckets: prometheus.ExponentialBuckets(1, 2, 18), }, []string{"namespace", "changefeed", "type"}) // type is for `sinkType` + // ExecWriteBytesGauge records the total number of bytes written by sink. + TotalWriteBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "write_bytes_total", + Help: "Total number of bytes written by sink", + }, []string{"namespace", "changefeed", "type"}) // type is for `sinkType` + // LargeRowSizeHistogram records size of large rows. LargeRowSizeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -69,6 +78,7 @@ var ( // InitMetrics registers all metrics in this file. func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(ExecBatchHistogram) + registry.MustRegister(TotalWriteBytesCounter) registry.MustRegister(ExecDDLHistogram) registry.MustRegister(LargeRowSizeHistogram) registry.MustRegister(ExecutionErrorCounter) diff --git a/cdc/sink/metrics/statistics.go b/cdc/sink/metrics/statistics.go index 406c09e9cdd..c02001b3d05 100644 --- a/cdc/sink/metrics/statistics.go +++ b/cdc/sink/metrics/statistics.go @@ -39,6 +39,7 @@ func NewStatistics(ctx context.Context, s := sinkType.String() statistics.metricExecDDLHis = ExecDDLHistogram.WithLabelValues(namespcae, changefeedID, s) statistics.metricExecBatchHis = ExecBatchHistogram.WithLabelValues(namespcae, changefeedID, s) + statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.WithLabelValues(namespcae, changefeedID, s) statistics.metricRowSizeHis = LargeRowSizeHistogram.WithLabelValues(namespcae, changefeedID, s) statistics.metricExecErrCnt = ExecutionErrorCounter.WithLabelValues(namespcae, changefeedID, s) return statistics @@ -55,6 +56,8 @@ type Statistics struct { metricExecDDLHis prometheus.Observer // Histogram for DML batch size. metricExecBatchHis prometheus.Observer + // Counter for total bytes of DML. + metricTotalWriteBytesCnt prometheus.Counter // Histogram for Row size. metricRowSizeHis prometheus.Observer // Counter for sink error. @@ -73,13 +76,14 @@ func (b *Statistics) ObserveRows(rows ...*model.RowChangedEvent) { } // RecordBatchExecution stats batch executors which return (batchRowCount, error). -func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error { - batchSize, err := executor() +func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error { + batchSize, batchWriteBytes, err := executor() if err != nil { b.metricExecErrCnt.Inc() return err } b.metricExecBatchHis.Observe(float64(batchSize)) + b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes)) return nil } diff --git a/tests/integration_tests/avro_basic/conf/changefeed.toml b/tests/integration_tests/avro_basic/conf/changefeed.toml index cdcff71a143..5623cb37452 100644 --- a/tests/integration_tests/avro_basic/conf/changefeed.toml +++ b/tests/integration_tests/avro_basic/conf/changefeed.toml @@ -1,3 +1,3 @@ [integrity] -integrity-check-level = "correctness" +integrity-check-level = "none" corruption-handle-level = "warn" diff --git a/tests/integration_tests/avro_basic/run.sh b/tests/integration_tests/avro_basic/run.sh index 6865f5176e5..43e6b60e703 100644 --- a/tests/integration_tests/avro_basic/run.sh +++ b/tests/integration_tests/avro_basic/run.sh @@ -36,7 +36,7 @@ function run() { curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "NONE"}' http://127.0.0.1:8088/config # upstream tidb cluster enable row level checksum - run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT} TOPIC_NAME="ticdc-avro-test" # record tso before we create tables to skip the system table DDLs