diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 9941004e0c2..7d2befeebdf 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -11606,10 +11606,15 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The count of the corresponding schedule commands which PD sends to each TiKV instance", + "description": "The count of the heartbeats which pending in the task queue.", "editable": true, "error": false, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, "fill": 0, + "fillGradient": 0, "grid": {}, "gridPos": { "h": 8, @@ -11617,6 +11622,241 @@ "x": 12, "y": 39 }, + "hiddenSeries": false, + "id": 1608, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{task_type}}_({{ runner_name}})", + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Heartbeat Runner Pending Task", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:505", + "format": "opm", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:506", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The count of the heartbeats which faileds in the task queue.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 47 + }, + "hiddenSeries": false, + "id": 1609, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "$$hashKey": "object:243", + "alias": "/max-wait-duration.*/", + "bars": true, + "lines": false, + "transform": "negative-Y", + "yaxis": 2 + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "failed-tasks-({{runner_name}})", + "refId": "A", + "step": 4 + }, + { + "exemplar": true, + "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "hide": false, + "interval": "", + "legendFormat": "max-wait-duration-({{runner_name}})", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Concurrent Runner Failed Task", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:201", + "decimals": null, + "format": "opm", + "label": "", + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:202", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The count of the corresponding schedule commands which PD sends to each TiKV instance", + "editable": true, + "error": false, + "fill": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 47 + }, "id": 1305, "legend": { "alignAsTable": true, @@ -11709,7 +11949,7 @@ "h": 8, "w": 12, "x": 0, - "y": 47 + "y": 55 }, "id": 1306, "legend": { @@ -11799,7 +12039,7 @@ "h": 8, "w": 12, "x": 12, - "y": 47 + "y": 55 }, "id": 1307, "legend": { @@ -11892,7 +12132,7 @@ "h": 8, "w": 12, "x": 0, - "y": 55 + "y": 63 }, "id": 1308, "legend": { @@ -11989,7 +12229,7 @@ "h": 8, "w": 12, "x": 12, - "y": 55 + "y": 63 }, "id": 1309, "legend": { @@ -12086,7 +12326,7 @@ "h": 8, "w": 12, "x": 0, - "y": 63 + "y": 71 }, "id": 1310, "legend": { @@ -12183,7 +12423,7 @@ "h": 8, "w": 12, "x": 12, - "y": 63 + "y": 71 }, "id": 1311, "legend": { @@ -12280,7 +12520,7 @@ "h": 8, "w": 12, "x": 0, - "y": 71 + "y": 79 }, "id": 1312, "legend": { diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 3c5020554a8..5d4443a1cc4 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -18,7 +18,10 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const nameStr = "runner_name" +const ( + nameStr = "runner_name" + taskStr = "task_type" +) var ( RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( @@ -35,7 +38,7 @@ var ( Subsystem: "ratelimit", Name: "runner_task_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) RunnerTaskFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index c4f2d5bc5ac..2d33b3f12ac 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -54,6 +54,7 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup + pendingTaskCount map[string]int64 failedTaskCount prometheus.Counter maxWaitingDuration prometheus.Gauge } @@ -66,6 +67,7 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), + pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -109,6 +111,9 @@ func (s *ConcurrentRunner) Start() { if len(s.pendingTasks) > 0 { maxDuration = time.Since(s.pendingTasks[0].submittedAt) } + for name, cnt := range s.pendingTaskCount { + RunnerTaskPendingTasks.WithLabelValues(s.name, name).Set(float64(cnt)) + } s.pendingMu.Unlock() s.maxWaitingDuration.Set(maxDuration.Seconds()) } @@ -132,6 +137,7 @@ func (s *ConcurrentRunner) processPendingTasks() { select { case s.taskChan <- task: s.pendingTasks = s.pendingTasks[1:] + s.pendingTaskCount[task.Opts.TaskName]-- return default: return @@ -167,6 +173,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con } task.submittedAt = time.Now() s.pendingTasks = append(s.pendingTasks, task) + s.pendingTaskCount[opt.TaskName]++ } return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dbc6a6cadf3..7188c75ba78 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,7 +107,7 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - hbConcurrentRunner = "heartbeat-async-task-runner" + hbConcurrentRunner = "heartbeat-async" ) // Server is the interface for cluster.