diff --git a/client/go.mod b/client/go.mod index 475cf716125..7c782695539 100644 --- a/client/go.mod +++ b/client/go.mod @@ -33,6 +33,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/client/go.sum b/client/go.sum index 620f70007a7..8f85f5ce7ed 100644 --- a/client/go.sum +++ b/client/go.sum @@ -68,6 +68,7 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= diff --git a/client/metrics.go b/client/metrics.go index e0b29fb8bcc..f3c47d7e787 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -105,7 +105,7 @@ func initMetrics(constLabels prometheus.Labels) { Subsystem: "request", Name: "tso_batch_send_latency", ConstLabels: constLabels, - Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), Help: "tso batch send latency", }) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 547c7f70921..d25eaee88f0 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -330,9 +330,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case notifyMsg := <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) - } + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } @@ -1179,11 +1177,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType switch selectTyp { case periodicReport: selected = selected || gc.shouldReportConsumption() + failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) { + selected = gc.name == val.(string) + }) fallthrough case lowToken: if counter.limiter.IsLowTokens() { selected = true } + failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) { + if selectTyp == lowToken { + selected = gc.name == val.(string) + } + }) } request := &rmpb.RequestUnitItem{ Type: typ, diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 9bbe34aaf7e..3300edf700f 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -24,8 +24,12 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" ) @@ -132,3 +136,138 @@ func TestResourceGroupThrottledError(t *testing.T) { re.Error(err) re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) } + +// MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface. +type MockResourceGroupProvider struct { + mock.Mock +} + +func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) { + args := m.Called(ctx, resourceGroupName, opts) + return args.Get(0).(*rmpb.ResourceGroup), args.Error(1) +} + +func (m *MockResourceGroupProvider) ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) { + args := m.Called(ctx, opts) + return args.Get(0).([]*rmpb.ResourceGroup), args.Error(1) +} + +func (m *MockResourceGroupProvider) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { + args := m.Called(ctx, metaGroup) + return args.String(0), args.Error(1) +} + +func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { + args := m.Called(ctx, metaGroup) + return args.String(0), args.Error(1) +} + +func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { + args := m.Called(ctx, resourceGroupName) + return args.String(0), args.Error(1) +} + +func (m *MockResourceGroupProvider) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { + args := m.Called(ctx, request) + return args.Get(0).([]*rmpb.TokenBucketResponse), args.Error(1) +} + +func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { + args := m.Called(ctx) + return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2) +} + +func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { + args := m.Called(ctx, key, opts) + return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1) +} + +func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { + args := m.Called(ctx, key, opts) + return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1) +} + +func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockProvider := new(MockResourceGroupProvider) + + mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) + // LoadResourceGroups + mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) + // Watch + mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default"))) + defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport") + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group"))) + defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport") + + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) + controller.Start(ctx) + + defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) + + c1, err := controller.tryGetResourceGroup(ctx, "default") + re.NoError(err) + re.Equal(defaultResourceGroup, c1.meta) + + c2, err := controller.tryGetResourceGroup(ctx, "test-group") + re.NoError(err) + re.Equal(testResourceGroup, c2.meta) + + var expectResp []*rmpb.TokenBucketResponse + recTestGroupAcquireTokenRequest := make(chan bool) + mockProvider.On("AcquireTokenBuckets", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + request := args.Get(1).(*rmpb.TokenBucketsRequest) + var responses []*rmpb.TokenBucketResponse + for _, req := range request.Requests { + if req.ResourceGroupName == "default" { + // no response the default group request, that's mean `len(c.run.currentRequests) != 0` always. + time.Sleep(100 * time.Second) + responses = append(responses, &rmpb.TokenBucketResponse{ + ResourceGroupName: "default", + GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ + { + GrantedTokens: &rmpb.TokenBucket{ + Tokens: 100000, + }, + }, + }, + }) + } else { + responses = append(responses, &rmpb.TokenBucketResponse{ + ResourceGroupName: req.ResourceGroupName, + GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ + { + GrantedTokens: &rmpb.TokenBucket{ + Tokens: 100000, + }, + }, + }, + }) + } + } + // receive test-group request + if len(request.Requests) == 1 && request.Requests[0].ResourceGroupName == "test-group" { + recTestGroupAcquireTokenRequest <- true + } + expectResp = responses + }).Return(expectResp, nil) + // wait default group request token by PeriodicReport. + time.Sleep(2 * time.Second) + counter := c2.run.requestUnitTokens[0] + counter.limiter.mu.Lock() + counter.limiter.notify() + counter.limiter.mu.Unlock() + select { + case res := <-recTestGroupAcquireTokenRequest: + re.True(res) + case <-time.After(5 * time.Second): + re.Fail("timeout") + } +} diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index d8f9ba12592..15364989cd7 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -44,6 +44,18 @@ var ( t8 = t0.Add(time.Duration(8) * d) ) +func resetTime() { + t0 = time.Now() + t1 = t0.Add(time.Duration(1) * d) + t2 = t0.Add(time.Duration(2) * d) + t3 = t0.Add(time.Duration(3) * d) + t4 = t0.Add(time.Duration(4) * d) + t5 = t0.Add(time.Duration(5) * d) + t6 = t0.Add(time.Duration(6) * d) + t7 = t0.Add(time.Duration(7) * d) + t8 = t0.Add(time.Duration(8) * d) +} + type request struct { t time.Time n float64 @@ -144,6 +156,7 @@ func TestNotify(t *testing.T) { } func TestCancel(t *testing.T) { + resetTime() ctx := context.Background() ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) @@ -161,8 +174,8 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t2, 7) checkTokens(re, lim2, t2, 2) d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) - re.Equal(4*time.Second, d) re.Error(err) + re.Equal(4*time.Second, d) re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00") checkTokens(re, lim1, t3, 13) checkTokens(re, lim2, t3, 3) diff --git a/client/tso_stream.go b/client/tso_stream.go index 14b72bc697b..dd5b9422aae 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -139,7 +139,7 @@ func (s *pdTSOStream) processRequests( } return } - tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime))) + tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) resp, err := s.stream.Recv() if err != nil { if err == io.EOF { @@ -195,7 +195,7 @@ func (s *tsoTSOStream) processRequests( } return } - tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime))) + tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) resp, err := s.stream.Recv() if err != nil { if err == io.EOF { diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index abfe049b905..ced79c5b9e0 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1655,12 +1655,20 @@ ], "targets": [ { - "expr": "pd_cluster_placement_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "expr": "sum(pd_cluster_placement_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}) by (name)", "format": "time_series", "hide": false, "intervalFactor": 2, "legendFormat": "{{name}}", "refId": "A" + }, + { + "expr": "pd_cluster_placement_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "format": "time_series", + "hide": true, + "intervalFactor": 2, + "legendFormat": "{{name}}--{{store}}", + "refId": "B" } ], "timeFrom": "1s", diff --git a/pkg/statistics/metrics.go b/pkg/statistics/metrics.go index a5ea07f4f55..68cbf142479 100644 --- a/pkg/statistics/metrics.go +++ b/pkg/statistics/metrics.go @@ -55,7 +55,7 @@ var ( Subsystem: "cluster", Name: "placement_status", Help: "Status of the cluster placement.", - }, []string{"type", "name"}) + }, []string{"type", "name", "store"}) configStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 4f76ffb0b5f..6d5df0bda62 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -47,7 +47,7 @@ type storeStatistics struct { LeaderCount int LearnerCount int WitnessCount int - LabelCounter map[string]int + LabelCounter map[string][]uint64 Preparing int Serving int Removing int @@ -57,7 +57,7 @@ type storeStatistics struct { func newStoreStatistics(opt config.ConfProvider) *storeStatistics { return &storeStatistics{ opt: opt, - LabelCounter: make(map[string]int), + LabelCounter: make(map[string][]uint64), } } @@ -70,7 +70,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { key := fmt.Sprintf("%s:%s", k, v) // exclude tombstone if !store.IsRemoved() { - s.LabelCounter[key]++ + s.LabelCounter[key] = append(s.LabelCounter[key], store.GetID()) } } storeAddress := store.GetAddress() @@ -249,8 +249,10 @@ func (s *storeStatistics) Collect() { configStatusGauge.WithLabelValues(typ).Set(value) } - for name, value := range s.LabelCounter { - placementStatusGauge.WithLabelValues(labelType, name).Set(float64(value)) + for name, stores := range s.LabelCounter { + for _, storeID := range stores { + placementStatusGauge.WithLabelValues(labelType, name, strconv.FormatUint(storeID, 10)).Set(1) + } } for storeID, limit := range s.opt.GetStoresLimit() { diff --git a/pkg/statistics/store_collection_test.go b/pkg/statistics/store_collection_test.go index 64a02a54bb4..e9fd1bba1fb 100644 --- a/pkg/statistics/store_collection_test.go +++ b/pkg/statistics/store_collection_test.go @@ -85,12 +85,14 @@ func TestStoreStatistics(t *testing.T) { re.Equal(0, stats.Disconnect) re.Equal(1, stats.Tombstone) re.Equal(1, stats.LowSpace) - re.Equal(2, stats.LabelCounter["zone:z1"]) - re.Equal(2, stats.LabelCounter["zone:z2"]) - re.Equal(2, stats.LabelCounter["zone:z3"]) - re.Equal(4, stats.LabelCounter["host:h1"]) - re.Equal(4, stats.LabelCounter["host:h2"]) - re.Equal(2, stats.LabelCounter["zone:unknown"]) + re.Len(stats.LabelCounter["zone:z1"], 2) + re.Equal([]uint64{1, 2}, stats.LabelCounter["zone:z1"]) + re.Len(stats.LabelCounter["zone:z2"], 2) + re.Len(stats.LabelCounter["zone:z3"], 2) + re.Len(stats.LabelCounter["host:h1"], 4) + re.Equal([]uint64{1, 3, 5, 7}, stats.LabelCounter["host:h1"]) + re.Len(stats.LabelCounter["host:h2"], 4) + re.Len(stats.LabelCounter["zone:unknown"], 2) } func TestSummaryStoreInfos(t *testing.T) { diff --git a/server/metrics.go b/server/metrics.go index 0935008a420..fdcc5b4be22 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -45,7 +45,7 @@ var ( Subsystem: "scheduler", Name: "region_heartbeat_latency_seconds", Help: "Bucketed histogram of latency (s) of receiving heartbeat.", - Buckets: prometheus.ExponentialBuckets(1, 2, 12), + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), }, []string{"address", "store"}) metadataGauge = prometheus.NewGaugeVec(