From 33ca2d31ab491094f43dec99e02e4e422723bd8e Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 28 Mar 2024 18:48:48 +0800 Subject: [PATCH] sink(ticdc): add total write bytes counter to sink (#10040) (#10568) close pingcap/tiflow#10114 --- cdc/sink/black_hole.go | 4 +- cdc/sink/metrics/metrics.go | 12 +++- cdc/sink/metrics/statistics.go | 12 ++-- cdc/sink/mq/mq_flush_worker.go | 8 ++- cdc/sink/mysql/mysql.go | 32 ++++++----- cdc/sink/mysql/mysql_test.go | 57 +++++++++++-------- .../eventsink/cloudstorage/dml_worker.go | 14 +++-- cdc/sinkv2/eventsink/mq/worker.go | 6 +- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 14 ++--- cdc/sinkv2/metrics/metrics.go | 10 ++++ cdc/sinkv2/metrics/statistics.go | 8 ++- 11 files changed, 112 insertions(+), 65 deletions(-) diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 702f9a50563..aba2acc4329 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -68,12 +68,12 @@ func (b *blackHoleSink) FlushRowChangedEvents( }, nil } log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts)) - err := b.statistics.RecordBatchExecution(func() (int, error) { + err := b.statistics.RecordBatchExecution(func() (int, int64, error) { // TODO: add some random replication latency accumulated := atomic.LoadUint64(&b.accumulated) lastAccumulated := atomic.SwapUint64(&b.lastAccumulated, accumulated) batchSize := accumulated - lastAccumulated - return int(batchSize), nil + return int(batchSize), int64(batchSize), nil }) b.statistics.PrintStatus(ctx) atomic.StoreUint64(&b.flushing, 0) diff --git a/cdc/sink/metrics/metrics.go b/cdc/sink/metrics/metrics.go index c74bc90a02d..d7a356d5995 100644 --- a/cdc/sink/metrics/metrics.go +++ b/cdc/sink/metrics/metrics.go @@ -43,7 +43,16 @@ var ( Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18), }, []string{"namespace", "changefeed", "type"}) // type is for `sinkType` - // LargeRowSizeHistogram records the row size of events. + // TotalWriteBytesCounter records the total number of bytes written by sink. + TotalWriteBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "write_bytes_total", + Help: "Total number of bytes written by sink", + }, []string{"namespace", "changefeed", "type"}) // type is for `sinkType` + + // LargeRowSizeHistogram records size of large rows. LargeRowSizeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -133,6 +142,7 @@ var ( func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(ExecBatchHistogram) registry.MustRegister(ExecTxnHistogram) + registry.MustRegister(TotalWriteBytesCounter) registry.MustRegister(ExecDDLHistogram) registry.MustRegister(LargeRowSizeHistogram) registry.MustRegister(ExecutionErrorCounter) diff --git a/cdc/sink/metrics/statistics.go b/cdc/sink/metrics/statistics.go index af3f8ce488a..707433de7e6 100644 --- a/cdc/sink/metrics/statistics.go +++ b/cdc/sink/metrics/statistics.go @@ -69,6 +69,8 @@ func NewStatistics(ctx context.Context, captureAddr string, t sinkType) *Statist WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s) statistics.metricExecErrCnt = ExecutionErrorCounter. WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID) + statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter. + WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s) // Flush metrics in background for better accuracy and efficiency. changefeedID := statistics.changefeedID @@ -116,8 +118,9 @@ type Statistics struct { metricExecDDLHis prometheus.Observer metricExecBatchHis prometheus.Observer metricExecErrCnt prometheus.Counter - - metricRowSizesHis prometheus.Observer + // Counter for total bytes of DML. + metricTotalWriteBytesCnt prometheus.Counter + metricRowSizesHis prometheus.Observer } // AddRowsCount records total number of rows needs to flush @@ -142,15 +145,16 @@ func (b *Statistics) AddDDLCount() { } // RecordBatchExecution records the cost time of batch execution and batch size -func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error { +func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error { startTime := time.Now() - batchSize, err := executor() + batchSize, batchWriteBytes, err := executor() if err != nil { b.metricExecErrCnt.Inc() return err } b.metricExecTxnHis.Observe(time.Since(startTime).Seconds()) b.metricExecBatchHis.Observe(float64(batchSize)) + b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes)) atomic.AddUint64(&b.totalFlushedRows, uint64(batchSize)) return nil } diff --git a/cdc/sink/mq/mq_flush_worker.go b/cdc/sink/mq/mq_flush_worker.go index 1f2d99afd9e..51cdf39c323 100644 --- a/cdc/sink/mq/mq_flush_worker.go +++ b/cdc/sink/mq/mq_flush_worker.go @@ -169,16 +169,18 @@ func (w *flushWorker) asyncSend( } } - err := w.statistics.RecordBatchExecution(func() (int, error) { + err := w.statistics.RecordBatchExecution(func() (int, int64, error) { thisBatchSize := 0 + thisBatchBytesCnt := 0 for _, message := range w.encoder.Build() { err := w.producer.AsyncSendMessage(ctx, key.Topic, key.Partition, message) if err != nil { - return 0, err + return 0, 0, err } thisBatchSize += message.GetRowsCount() + thisBatchBytesCnt += message.Length() } - return thisBatchSize, nil + return thisBatchSize, int64(thisBatchBytesCnt), nil }) if err != nil { return err diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index c1aab3beeb1..bfbef9486ab 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -662,10 +662,10 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM failpoint.Inject("MySQLSinkHangLongTime", func() { time.Sleep(time.Hour) }) - err := s.statistics.RecordBatchExecution(func() (int, error) { + err := s.statistics.RecordBatchExecution(func() (int, int64, error) { tx, err := s.db.BeginTx(pctx, nil) if err != nil { - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.params.changefeedID, "BEGIN", dmls.rowCount, dmls.startTs) } @@ -684,7 +684,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM } } cancelFunc() - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.params.changefeedID, query, dmls.rowCount, dmls.startTs) } @@ -692,11 +692,11 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM } if err = tx.Commit(); err != nil { - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.params.changefeedID, "COMMIT", dmls.rowCount, dmls.startTs) } - return dmls.rowCount, nil + return dmls.rowCount, dmls.approximateSize, nil }) if err != nil { return errors.Trace(err) @@ -714,10 +714,11 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM } type preparedDMLs struct { - startTs []model.Ts - sqls []string - values [][]interface{} - rowCount int + startTs []model.Ts + sqls []string + values [][]interface{} + rowCount int + approximateSize int64 } // prepareDMLs converts model.RowChangedEvent list to query string list and args list @@ -727,6 +728,8 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent) *preparedDMLs { values := make([][]interface{}, 0, len(rows)) replaces := make(map[string][][]interface{}) rowCount := 0 + approximateSize := int64(0) + // translateToInsert control the update and insert behavior translateToInsert := s.params.enableOldValue && !s.params.safeMode for _, row := range rows { @@ -768,6 +771,7 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent) *preparedDMLs { values = append(values, args) rowCount++ } + approximateSize += int64(len(query)) + row.ApproximateDataSize continue } @@ -814,14 +818,16 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent) *preparedDMLs { } } } + approximateSize += int64(len(query)) + row.ApproximateDataSize } flushCacheDMLs() dmls := &preparedDMLs{ - startTs: startTs, - sqls: sqls, - values: values, - rowCount: rowCount, + startTs: startTs, + sqls: sqls, + values: values, + rowCount: rowCount, + approximateSize: approximateSize, } return dmls } diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 88ae0de3541..93d5cfd594d 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -88,10 +88,11 @@ func TestPrepareDML(t *testing.T) { }, }, expected: &preparedDMLs{ - startTs: []model.Ts{418658114257813514}, - sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"}, - values: [][]interface{}{{1, 1}}, - rowCount: 1, + startTs: []model.Ts{418658114257813514}, + sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"}, + values: [][]interface{}{{1, 1}}, + rowCount: 1, + approximateSize: 75, }, }, { input: []*model.RowChangedEvent{ @@ -114,10 +115,11 @@ func TestPrepareDML(t *testing.T) { }, }, expected: &preparedDMLs{ - startTs: []model.Ts{418658114257813516}, - sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"}, - values: [][]interface{}{{2, 2}}, - rowCount: 1, + startTs: []model.Ts{418658114257813516}, + sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"}, + values: [][]interface{}{{2, 2}}, + rowCount: 1, + approximateSize: 64, }, }, } @@ -2214,8 +2216,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { sqls: []string{ "INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);", }, - values: [][]interface{}{{1, 1}}, - rowCount: 1, + values: [][]interface{}{{1, 1}}, + rowCount: 1, + approximateSize: 63, }, }, { name: "insert with PK", @@ -2239,10 +2242,11 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ - startTs: []model.Ts{418658114257813514}, - sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"}, - values: [][]interface{}{{1, 1}}, - rowCount: 1, + startTs: []model.Ts{418658114257813514}, + sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"}, + values: [][]interface{}{{1, 1}}, + rowCount: 1, + approximateSize: 52, }, }, { name: "update without PK", @@ -2282,8 +2286,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { "UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " + "WHERE `a1`=? AND `a3`=? LIMIT 1;", }, - values: [][]interface{}{{3, 3, 2, 2}}, - rowCount: 1, + values: [][]interface{}{{3, 3, 2, 2}}, + rowCount: 1, + approximateSize: 84, }, }, { name: "update with PK", @@ -2321,8 +2326,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { startTs: []model.Ts{418658114257813516}, sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " + "WHERE `a1`=? AND `a3`=? LIMIT 1;"}, - values: [][]interface{}{{3, 3, 2, 2}}, - rowCount: 1, + values: [][]interface{}{{3, 3, 2, 2}}, + rowCount: 1, + approximateSize: 73, }, }, { name: "batch insert with PK", @@ -2368,8 +2374,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { "INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", "INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", }, - values: [][]interface{}{{3, 3}, {5, 5}}, - rowCount: 2, + values: [][]interface{}{{3, 3}, {5, 5}}, + rowCount: 2, + approximateSize: 104, }, }, { name: "safe mode on commit ts < replicating ts", @@ -2397,8 +2404,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { sqls: []string{ "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", }, - values: [][]interface{}{{3, 3}}, - rowCount: 1, + values: [][]interface{}{{3, 3}}, + rowCount: 1, + approximateSize: 53, }, }, { name: "safe mode on one row commit ts < replicating ts", @@ -2444,8 +2452,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", }, - values: [][]interface{}{{3, 3}, {5, 5}}, - rowCount: 2, + values: [][]interface{}{{3, 3}, {5, 5}}, + rowCount: 2, + approximateSize: 106, }, }, } diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go index 7766f1780dc..f700170f38d 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go @@ -230,23 +230,24 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) rowsCnt := 0 + bytesCnt := int64(0) for _, msg := range task.msgs { - d.metricWriteBytes.Add(float64(len(msg.Value))) + bytesCnt += int64(len(msg.Value)) rowsCnt += msg.GetRowsCount() buf.Write(msg.Value) callbacks = append(callbacks, msg.Callback) } - if err := d.statistics.RecordBatchExecution(func() (_ int, inErr error) { + if err := d.statistics.RecordBatchExecution(func() (_ int, _ int64, inErr error) { if d.config.FlushConcurrency <= 1 { - return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) } writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ Concurrency: d.config.FlushConcurrency, }) if inErr != nil { - return 0, inErr + return 0, 0, inErr } defer func() { @@ -263,13 +264,14 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single } }() if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { - return 0, inErr + return 0, 0, inErr } - return rowsCnt, nil + return rowsCnt, bytesCnt, nil }); err != nil { return err } + d.metricWriteBytes.Add(float64(bytesCnt)) d.metricFileCount.Add(1) for _, cb := range callbacks { if cb != nil { diff --git a/cdc/sinkv2/eventsink/mq/worker.go b/cdc/sinkv2/eventsink/mq/worker.go index ec183e3f558..dd506563785 100644 --- a/cdc/sinkv2/eventsink/mq/worker.go +++ b/cdc/sinkv2/eventsink/mq/worker.go @@ -291,11 +291,11 @@ func (w *worker) sendMessages(ctx context.Context) error { } for _, message := range future.Messages { start := time.Now() - if err := w.statistics.RecordBatchExecution(func() (int, error) { + if err := w.statistics.RecordBatchExecution(func() (int, int64, error) { if err := w.producer.AsyncSendMessage(ctx, future.Topic, future.Partition, message); err != nil { - return 0, err + return 0, 0, err } - return message.GetRowsCount(), nil + return message.GetRowsCount(), int64(message.Length()), nil }); err != nil { return err } diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index 539aafb92db..caccf3c2631 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -695,10 +695,10 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare }) failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) }) - err := s.statistics.RecordBatchExecution(func() (int, error) { + err := s.statistics.RecordBatchExecution(func() (int, int64, error) { tx, err := s.db.BeginTx(pctx, nil) if err != nil { - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs) } @@ -710,12 +710,12 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare err = s.multiStmtExecute(pctx, dmls, tx, writeTimeout) if err != nil { fallbackToSeqWay = true - return 0, err + return 0, 0, err } } else { err = s.sequenceExecute(pctx, dmls, tx, writeTimeout) if err != nil { - return 0, err + return 0, 0, err } } @@ -733,15 +733,15 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) } } - return 0, err + return 0, 0, err } if err = tx.Commit(); err != nil { - return 0, logDMLTxnErr( + return 0, 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs) } - return dmls.rowCount, nil + return dmls.rowCount, dmls.approximateSize, nil }) if err != nil { return errors.Trace(err) diff --git a/cdc/sinkv2/metrics/metrics.go b/cdc/sinkv2/metrics/metrics.go index 3add4c54406..f793ba13f52 100644 --- a/cdc/sinkv2/metrics/metrics.go +++ b/cdc/sinkv2/metrics/metrics.go @@ -63,6 +63,15 @@ var ( Name: "execution_error", Help: "Total count of execution errors.", }, []string{"namespace", "changefeed", "type"}) // type is for `sinkType` + + // ExecWriteBytesGauge records the total number of bytes written by sink. + TotalWriteBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sinkv2", + Name: "write_bytes_total", + Help: "Total number of bytes written by sink", + }, []string{"namespace", "changefeed", "type"}) // type is for `sinkType` ) // InitMetrics registers all metrics in this file. @@ -71,6 +80,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(ExecDDLHistogram) registry.MustRegister(LargeRowSizeHistogram) registry.MustRegister(ExecutionErrorCounter) + registry.MustRegister(TotalWriteBytesCounter) txn.InitMetrics(registry) mq.InitMetrics(registry) diff --git a/cdc/sinkv2/metrics/statistics.go b/cdc/sinkv2/metrics/statistics.go index 520b68d39fc..7e39dba1175 100644 --- a/cdc/sinkv2/metrics/statistics.go +++ b/cdc/sinkv2/metrics/statistics.go @@ -38,6 +38,7 @@ func NewStatistics(ctx context.Context, sinkType sink.Type) *Statistics { statistics.metricExecBatchHis = ExecBatchHistogram.WithLabelValues(namespcae, changefeedID, s) statistics.metricRowSizeHis = LargeRowSizeHistogram.WithLabelValues(namespcae, changefeedID, s) statistics.metricExecErrCnt = ExecutionErrorCounter.WithLabelValues(namespcae, changefeedID, s) + statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.WithLabelValues(namespcae, changefeedID, s) return statistics } @@ -56,6 +57,8 @@ type Statistics struct { metricRowSizeHis prometheus.Observer // Counter for sink error. metricExecErrCnt prometheus.Counter + // Counter for total bytes of DML. + metricTotalWriteBytesCnt prometheus.Counter } // ObserveRows stats all received `RowChangedEvent`s. @@ -70,13 +73,14 @@ func (b *Statistics) ObserveRows(rows ...*model.RowChangedEvent) { } // RecordBatchExecution stats batch executors which return (batchRowCount, error). -func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error { - batchSize, err := executor() +func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error { + batchSize, batchWriteBytes, err := executor() if err != nil { b.metricExecErrCnt.Inc() return err } b.metricExecBatchHis.Observe(float64(batchSize)) + b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes)) return nil }