Skip to content

Commit

Permalink
Merge branch 'master' into add-log
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Oct 8, 2023
2 parents 08d0bf3 + 9556040 commit 9baff86
Show file tree
Hide file tree
Showing 52 changed files with 1,238 additions and 686 deletions.
5 changes: 3 additions & 2 deletions pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package balancer

import (
"sync"
"sync/atomic"

"github.com/tikv/pd/pkg/utils/syncutil"
)

// RoundRobin is a balancer that selects nodes in a round-robin fashion.
type RoundRobin[T uint32 | string] struct {
sync.RWMutex
syncutil.RWMutex
nodes []T
exists map[T]struct{}
next uint32
Expand Down
5 changes: 3 additions & 2 deletions pkg/btree/btree_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ package btree

import (
"sort"
"sync"

"github.com/tikv/pd/pkg/utils/syncutil"
)

// Item represents a single object in the tree.
Expand All @@ -101,7 +102,7 @@ const (
// FreeList, in particular when they're created with Clone.
// Two Btrees using the same freelist are safe for concurrent write access.
type FreeListG[T Item[T]] struct {
mu sync.Mutex
mu syncutil.Mutex
freelist []*node[T]
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/btree/btree_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"sort"
"sync"
"testing"

"github.com/tikv/pd/pkg/utils/syncutil"
)

// perm returns a random permutation of n Int items in the range [0, n).
Expand Down Expand Up @@ -752,7 +754,7 @@ func BenchmarkDescendLessOrEqual(b *testing.B) {

const cloneTestSize = 10000

func cloneTestG[T Item[T]](t *testing.T, b *BTreeG[T], start int, p []T, wg *sync.WaitGroup, trees *[]*BTreeG[T], lock *sync.Mutex) {
func cloneTestG[T Item[T]](t *testing.T, b *BTreeG[T], start int, p []T, wg *sync.WaitGroup, trees *[]*BTreeG[T], lock *syncutil.Mutex) {
t.Logf("Starting new clone at %v", start)
lock.Lock()
*trees = append(*trees, b)
Expand All @@ -773,7 +775,7 @@ func TestCloneConcurrentOperationsG(t *testing.T) {
p := perm(cloneTestSize)
var wg sync.WaitGroup
wg.Add(1)
go cloneTestG(t, b, 0, p, &wg, &trees, &sync.Mutex{})
go cloneTestG(t, b, 0, p, &wg, &trees, &syncutil.Mutex{})
wg.Wait()
want := rang(cloneTestSize)
t.Logf("Starting equality checks on %d trees", len(trees))
Expand Down
20 changes: 18 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"sort"
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
Expand Down Expand Up @@ -996,6 +998,11 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol

// UpdateSubTree updates the subtree.
func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*RegionInfo, rangeChanged bool) {
failpoint.Inject("UpdateSubTree", func() {
if origin == nil {
time.Sleep(time.Second)
}
})
r.st.Lock()
defer r.st.Unlock()
if origin != nil {
Expand All @@ -1004,8 +1011,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
r.updateSubTreeStat(origin, region)
r.subRegions[region.GetID()].RegionInfo = region
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -450,6 +451,18 @@ func TestRegionKey(t *testing.T) {
}
}

func TestSetRegionConcurrence(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/core/UpdateSubTree", `return()`))
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region)
}()
regions.AtomicCheckAndPutRegion(region)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

func TestSetRegion(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
Expand Down
4 changes: 2 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package election

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -27,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -61,7 +61,7 @@ type Leadership struct {

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock sync.Mutex
keepAliveCancelFuncLock syncutil.Mutex
}

// NewLeadership creates a new Leadership.
Expand Down
3 changes: 2 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -60,7 +61,7 @@ type GroupManager struct {
client *clientv3.Client
clusterID uint64

sync.RWMutex
syncutil.RWMutex
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
Expand Down
31 changes: 28 additions & 3 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/reflectutil"
)

Expand All @@ -57,7 +59,7 @@ func init() {
// Service is the resource group service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup
root *gin.RouterGroup

manager *rmserver.Manager
}
Expand Down Expand Up @@ -86,15 +88,22 @@ func NewService(srv *rmserver.Service) *Service {
s := &Service{
manager: manager,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
root: endpoint,
}
s.RegisterAdminRouter()
s.RegisterRouter()
return s
}

// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router.PUT("/log", changeLogLevel)
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
configEndpoint := s.baseEndpoint.Group("/config")
configEndpoint := s.root.Group("/config")
configEndpoint.POST("/group", s.postResourceGroup)
configEndpoint.PUT("/group", s.putResourceGroup)
configEndpoint.GET("/group/:name", s.getResourceGroup)
Expand All @@ -110,6 +119,22 @@ func (s *Service) handler() http.Handler {
})
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*rmserver.Service)
var level string
if err := c.Bind(&level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

if err := svr.SetLogLevel(level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
log.SetLevel(logutil.StringToZapLogLevel(level))
c.String(http.StatusOK, "The log level is updated.")
}

// postResourceGroup
//
// @Tags ResourceManager
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Service is the gRPC service for resource manager.
type Service struct {
ctx context.Context
ctx context.Context
*Server
manager *Manager
// settings
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/jsonutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

Expand All @@ -49,7 +49,7 @@ const (

// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
syncutil.RWMutex
srv bs.Server
controllerConfig *ControllerConfig
groups map[string]*ResourceGroup
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ package server

import (
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

// ResourceGroup is the definition of a resource group, for REST API.
type ResourceGroup struct {
sync.RWMutex
syncutil.RWMutex
Name string `json:"name"`
Mode rmpb.GroupMode `json:"mode"`
// RU settings
Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/resource_manager"
Expand Down Expand Up @@ -86,6 +87,17 @@ func (s *Server) GetAddr() string {
return s.cfg.ListenAddr
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
return errors.Errorf("log level %s is illegal", level)
}
s.cfg.Log.Level = level
log.SetLevel(logutil.StringToZapLogLevel(level))
log.Warn("log level changed", zap.String("level", log.GetLevel().String()))
return nil
}

// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
skipWaitAPIServiceReady := false
Expand Down
Loading

0 comments on commit 9baff86

Please sign in to comment.