diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index ed2a5442908..61df2fbbb56 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -16,6 +16,7 @@ import ( "bytes" "context" "path" + "strconv" "sync/atomic" "time" @@ -127,7 +128,7 @@ func newDMLWorker( metricFlushDuration: mcloudstorage.CloudStorageFlushDurationHistogram. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricsWorkerBusyRatio: mcloudstorage.CloudStorageWorkerBusyRatioCounter. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), + WithLabelValues(changefeedID.Namespace, changefeedID.ID, strconv.Itoa(id)), } return d @@ -155,7 +156,7 @@ func (d *dmlWorker) run(ctx context.Context) error { // active means that a table has events since last flushing. func (d *dmlWorker) flushMessages(ctx context.Context) error { var flushTimeSlice, totalTimeSlice time.Duration - overseerTicker := time.NewTicker(time.Second) + overseerTicker := time.NewTicker(d.config.FlushInterval * 2) defer overseerTicker.Stop() startToWork := time.Now() for { @@ -164,8 +165,8 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { return errors.Trace(ctx.Err()) case now := <-overseerTicker.C: totalTimeSlice = now.Sub(startToWork) - busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000) - d.metricsWorkerBusyRatio.Add(float64(busyRatio) / float64(d.config.WorkerCount)) + busyRatio := float64(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000) + d.metricsWorkerBusyRatio.Add(busyRatio) startToWork = now flushTimeSlice = 0 case task := <-d.flushNotifyCh: diff --git a/cdc/sink/metrics/cloudstorage/metrics.go b/cdc/sink/metrics/cloudstorage/metrics.go index e96b14ef9ce..52905b71346 100644 --- a/cdc/sink/metrics/cloudstorage/metrics.go +++ b/cdc/sink/metrics/cloudstorage/metrics.go @@ -62,7 +62,7 @@ var ( Subsystem: subsystem, Name: "cloud_storage_worker_busy_ratio", Help: "Busy ratio (X ms in 1s) for cloud storage sink dml worker.", - }, []string{"namespace", "changefeed"}) + }, []string{"namespace", "changefeed", "id"}) ) // InitMetrics registers all metrics in this file.