Skip to content

Commit

Permalink
sink(ticdc): add total write bytes counter to sink (#10040) (#10568)
Browse files Browse the repository at this point in the history
close #10114
  • Loading branch information
ti-chi-bot authored Mar 28, 2024
1 parent c205ecc commit 33ca2d3
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 65 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (b *blackHoleSink) FlushRowChangedEvents(
}, nil
}
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts))
err := b.statistics.RecordBatchExecution(func() (int, error) {
err := b.statistics.RecordBatchExecution(func() (int, int64, error) {
// TODO: add some random replication latency
accumulated := atomic.LoadUint64(&b.accumulated)
lastAccumulated := atomic.SwapUint64(&b.lastAccumulated, accumulated)
batchSize := accumulated - lastAccumulated
return int(batchSize), nil
return int(batchSize), int64(batchSize), nil
})
b.statistics.PrintStatus(ctx)
atomic.StoreUint64(&b.flushing, 0)
Expand Down
12 changes: 11 additions & 1 deletion cdc/sink/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ var (
Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18),
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// LargeRowSizeHistogram records the row size of events.
// TotalWriteBytesCounter records the total number of bytes written by sink.
TotalWriteBytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "write_bytes_total",
Help: "Total number of bytes written by sink",
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// LargeRowSizeHistogram records size of large rows.
LargeRowSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -133,6 +142,7 @@ var (
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ExecBatchHistogram)
registry.MustRegister(ExecTxnHistogram)
registry.MustRegister(TotalWriteBytesCounter)
registry.MustRegister(ExecDDLHistogram)
registry.MustRegister(LargeRowSizeHistogram)
registry.MustRegister(ExecutionErrorCounter)
Expand Down
12 changes: 8 additions & 4 deletions cdc/sink/metrics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func NewStatistics(ctx context.Context, captureAddr string, t sinkType) *Statist
WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s)
statistics.metricExecErrCnt = ExecutionErrorCounter.
WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID)
statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.
WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s)

// Flush metrics in background for better accuracy and efficiency.
changefeedID := statistics.changefeedID
Expand Down Expand Up @@ -116,8 +118,9 @@ type Statistics struct {
metricExecDDLHis prometheus.Observer
metricExecBatchHis prometheus.Observer
metricExecErrCnt prometheus.Counter

metricRowSizesHis prometheus.Observer
// Counter for total bytes of DML.
metricTotalWriteBytesCnt prometheus.Counter
metricRowSizesHis prometheus.Observer
}

// AddRowsCount records total number of rows needs to flush
Expand All @@ -142,15 +145,16 @@ func (b *Statistics) AddDDLCount() {
}

// RecordBatchExecution records the cost time of batch execution and batch size
func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error {
func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error {
startTime := time.Now()
batchSize, err := executor()
batchSize, batchWriteBytes, err := executor()
if err != nil {
b.metricExecErrCnt.Inc()
return err
}
b.metricExecTxnHis.Observe(time.Since(startTime).Seconds())
b.metricExecBatchHis.Observe(float64(batchSize))
b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes))
atomic.AddUint64(&b.totalFlushedRows, uint64(batchSize))
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions cdc/sink/mq/mq_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,18 @@ func (w *flushWorker) asyncSend(
}
}

err := w.statistics.RecordBatchExecution(func() (int, error) {
err := w.statistics.RecordBatchExecution(func() (int, int64, error) {
thisBatchSize := 0
thisBatchBytesCnt := 0
for _, message := range w.encoder.Build() {
err := w.producer.AsyncSendMessage(ctx, key.Topic, key.Partition, message)
if err != nil {
return 0, err
return 0, 0, err
}
thisBatchSize += message.GetRowsCount()
thisBatchBytesCnt += message.Length()
}
return thisBatchSize, nil
return thisBatchSize, int64(thisBatchBytesCnt), nil
})
if err != nil {
return err
Expand Down
32 changes: 19 additions & 13 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,10 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
failpoint.Inject("MySQLSinkHangLongTime", func() {
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
err := s.statistics.RecordBatchExecution(func() (int, int64, error) {
tx, err := s.db.BeginTx(pctx, nil)
if err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, "BEGIN", dmls.rowCount, dmls.startTs)
}
Expand All @@ -684,19 +684,19 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
}
}
cancelFunc()
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount, dmls.startTs)
}
cancelFunc()
}

if err = tx.Commit(); err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, "COMMIT", dmls.rowCount, dmls.startTs)
}
return dmls.rowCount, nil
return dmls.rowCount, dmls.approximateSize, nil
})
if err != nil {
return errors.Trace(err)
Expand All @@ -714,10 +714,11 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
}

type preparedDMLs struct {
startTs []model.Ts
sqls []string
values [][]interface{}
rowCount int
startTs []model.Ts
sqls []string
values [][]interface{}
rowCount int
approximateSize int64
}

// prepareDMLs converts model.RowChangedEvent list to query string list and args list
Expand All @@ -727,6 +728,8 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent) *preparedDMLs {
values := make([][]interface{}, 0, len(rows))
replaces := make(map[string][][]interface{})
rowCount := 0
approximateSize := int64(0)

// translateToInsert control the update and insert behavior
translateToInsert := s.params.enableOldValue && !s.params.safeMode
for _, row := range rows {
Expand Down Expand Up @@ -768,6 +771,7 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent) *preparedDMLs {
values = append(values, args)
rowCount++
}
approximateSize += int64(len(query)) + row.ApproximateDataSize
continue
}

Expand Down Expand Up @@ -814,14 +818,16 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent) *preparedDMLs {
}
}
}
approximateSize += int64(len(query)) + row.ApproximateDataSize
}
flushCacheDMLs()

dmls := &preparedDMLs{
startTs: startTs,
sqls: sqls,
values: values,
rowCount: rowCount,
startTs: startTs,
sqls: sqls,
values: values,
rowCount: rowCount,
approximateSize: approximateSize,
}
return dmls
}
Expand Down
57 changes: 33 additions & 24 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ func TestPrepareDML(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
startTs: []model.Ts{418658114257813514},
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
approximateSize: 75,
},
}, {
input: []*model.RowChangedEvent{
Expand All @@ -114,10 +115,11 @@ func TestPrepareDML(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{2, 2}},
rowCount: 1,
startTs: []model.Ts{418658114257813516},
sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{2, 2}},
rowCount: 1,
approximateSize: 64,
},
},
}
Expand Down Expand Up @@ -2214,8 +2216,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
sqls: []string{
"INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{1, 1}},
rowCount: 1,
values: [][]interface{}{{1, 1}},
rowCount: 1,
approximateSize: 63,
},
}, {
name: "insert with PK",
Expand All @@ -2239,10 +2242,11 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
startTs: []model.Ts{418658114257813514},
sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
approximateSize: 52,
},
}, {
name: "update without PK",
Expand Down Expand Up @@ -2282,8 +2286,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
"UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1;",
},
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
approximateSize: 84,
},
}, {
name: "update with PK",
Expand Down Expand Up @@ -2321,8 +2326,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1;"},
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
approximateSize: 73,
},
}, {
name: "batch insert with PK",
Expand Down Expand Up @@ -2368,8 +2374,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
approximateSize: 104,
},
}, {
name: "safe mode on commit ts < replicating ts",
Expand Down Expand Up @@ -2397,8 +2404,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
sqls: []string{
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{3, 3}},
rowCount: 1,
values: [][]interface{}{{3, 3}},
rowCount: 1,
approximateSize: 53,
},
}, {
name: "safe mode on one row commit ts < replicating ts",
Expand Down Expand Up @@ -2444,8 +2452,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
approximateSize: 106,
},
},
}
Expand Down
14 changes: 8 additions & 6 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,24 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
var callbacks []func()
buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
bytesCnt := int64(0)
for _, msg := range task.msgs {
d.metricWriteBytes.Add(float64(len(msg.Value)))
bytesCnt += int64(len(msg.Value))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
callbacks = append(callbacks, msg.Callback)
}

if err := d.statistics.RecordBatchExecution(func() (_ int, inErr error) {
if err := d.statistics.RecordBatchExecution(func() (_ int, _ int64, inErr error) {
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, inErr
return 0, 0, inErr
}

defer func() {
Expand All @@ -263,13 +264,14 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
return 0, 0, inErr
}
return rowsCnt, nil
return rowsCnt, bytesCnt, nil
}); err != nil {
return err
}

d.metricWriteBytes.Add(float64(bytesCnt))
d.metricFileCount.Add(1)
for _, cb := range callbacks {
if cb != nil {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sinkv2/eventsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ func (w *worker) sendMessages(ctx context.Context) error {
}
for _, message := range future.Messages {
start := time.Now()
if err := w.statistics.RecordBatchExecution(func() (int, error) {
if err := w.statistics.RecordBatchExecution(func() (int, int64, error) {
if err := w.producer.AsyncSendMessage(ctx, future.Topic, future.Partition, message); err != nil {
return 0, err
return 0, 0, err
}
return message.GetRowsCount(), nil
return message.GetRowsCount(), int64(message.Length()), nil
}); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 33ca2d3

Please sign in to comment.