Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into sche-redirect3
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Oct 8, 2023
2 parents 00950cb + f3ed1a0 commit d19578d
Show file tree
Hide file tree
Showing 19 changed files with 209 additions and 36 deletions.
20 changes: 18 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"sort"
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
Expand Down Expand Up @@ -996,6 +998,11 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol

// UpdateSubTree updates the subtree.
func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*RegionInfo, rangeChanged bool) {
failpoint.Inject("UpdateSubTree", func() {
if origin == nil {
time.Sleep(time.Second)
}
})
r.st.Lock()
defer r.st.Unlock()
if origin != nil {
Expand All @@ -1004,8 +1011,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
r.updateSubTreeStat(origin, region)
r.subRegions[region.GetID()].RegionInfo = region
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -450,6 +451,18 @@ func TestRegionKey(t *testing.T) {
}
}

func TestSetRegionConcurrence(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/core/UpdateSubTree", `return()`))
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region)
}()
regions.AtomicCheckAndPutRegion(region)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

func TestSetRegion(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
Expand Down
31 changes: 28 additions & 3 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/reflectutil"
)

Expand All @@ -57,7 +59,7 @@ func init() {
// Service is the resource group service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup
root *gin.RouterGroup

manager *rmserver.Manager
}
Expand Down Expand Up @@ -86,15 +88,22 @@ func NewService(srv *rmserver.Service) *Service {
s := &Service{
manager: manager,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
root: endpoint,
}
s.RegisterAdminRouter()
s.RegisterRouter()
return s
}

// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router.PUT("/log", changeLogLevel)
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
configEndpoint := s.baseEndpoint.Group("/config")
configEndpoint := s.root.Group("/config")
configEndpoint.POST("/group", s.postResourceGroup)
configEndpoint.PUT("/group", s.putResourceGroup)
configEndpoint.GET("/group/:name", s.getResourceGroup)
Expand All @@ -110,6 +119,22 @@ func (s *Service) handler() http.Handler {
})
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*rmserver.Service)
var level string
if err := c.Bind(&level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 127 in pkg/mcs/resourcemanager/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/apis/v1/api.go#L123-L127

Added lines #L123 - L127 were not covered by tests
}

if err := svr.SetLogLevel(level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 132 in pkg/mcs/resourcemanager/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/apis/v1/api.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}
log.SetLevel(logutil.StringToZapLogLevel(level))
c.String(http.StatusOK, "The log level is updated.")

Check warning on line 135 in pkg/mcs/resourcemanager/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/apis/v1/api.go#L134-L135

Added lines #L134 - L135 were not covered by tests
}

// postResourceGroup
//
// @Tags ResourceManager
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Service is the gRPC service for resource manager.
type Service struct {
ctx context.Context
ctx context.Context
*Server
manager *Manager
// settings
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/resource_manager"
Expand Down Expand Up @@ -86,6 +87,17 @@ func (s *Server) GetAddr() string {
return s.cfg.ListenAddr
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
return errors.Errorf("log level %s is illegal", level)

Check warning on line 93 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}
s.cfg.Log.Level = level
log.SetLevel(logutil.StringToZapLogLevel(level))
log.Warn("log level changed", zap.String("level", log.GetLevel().String()))
return nil

Check warning on line 98 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L95-L98

Added lines #L95 - L98 were not covered by tests
}

// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
skipWaitAPIServiceReady := false
Expand Down
25 changes: 25 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/unrolled/render"
)

Expand Down Expand Up @@ -109,12 +111,19 @@ func NewService(srv *scheserver.Service) *Service {
root: root,
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
return s
}

// RegisterAdminRouter registers the router of the admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router.PUT("/log", changeLogLevel)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
Expand Down Expand Up @@ -142,6 +151,22 @@ func (s *Service) RegisterOperatorsRouter() {
router.GET("/records", getOperatorRecords)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var level string
if err := c.Bind(&level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 159 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L155-L159

Added lines #L155 - L159 were not covered by tests
}

if err := svr.SetLogLevel(level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 164 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L162-L164

Added lines #L162 - L164 were not covered by tests
}
log.SetLevel(logutil.StringToZapLogLevel(level))
c.String(http.StatusOK, "The log level is updated.")

Check warning on line 167 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L166-L167

Added lines #L166 - L167 were not covered by tests
}

// @Tags operators
// @Summary Get an operator by ID.
// @Param region_id path int true "A Region's Id"
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
log.Info("update scheduling config", zap.Reflect("new", cfg))
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
Expand Down Expand Up @@ -146,6 +147,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand All @@ -161,6 +163,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("remove scheduler config", zap.String("key", string(kv.Key)))
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"strings"
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// ruleStorage is an in-memory storage for Placement Rules,
Expand Down Expand Up @@ -163,12 +165,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -188,12 +192,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRuleGroup(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -213,12 +219,14 @@ func (rw *Watcher) initializeGroupWatcher() error {
func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRegionRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -119,6 +120,17 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
return errors.Errorf("log level %s is illegal", level)

Check warning on line 126 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}
s.cfg.Log.Level = level
log.SetLevel(logutil.StringToZapLogLevel(level))
log.Warn("log level changed", zap.String("level", log.GetLevel().String()))
return nil

Check warning on line 131 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L128-L131

Added lines #L128 - L131 were not covered by tests
}

// Run runs the scheduling server.
func (s *Server) Run() error {
skipWaitAPIServiceReady := false
Expand Down
18 changes: 18 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -107,6 +108,7 @@ func NewService(srv *tsoserver.Service) *Service {
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router.POST("/reset-ts", ResetTS)
router.PUT("/log", changeLogLevel)
}

// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
Expand All @@ -115,6 +117,22 @@ func (s *Service) RegisterKeyspaceGroupRouter() {
router.GET("/members", GetKeyspaceGroupMembers)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
var level string
if err := c.Bind(&level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 125 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L121-L125

Added lines #L121 - L125 were not covered by tests
}

if err := svr.SetLogLevel(level); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 130 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L128-L130

Added lines #L128 - L130 were not covered by tests
}
log.SetLevel(logutil.StringToZapLogLevel(level))
c.String(http.StatusOK, "The log level is updated.")

Check warning on line 133 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L132-L133

Added lines #L132 - L133 were not covered by tests
}

// ResetTSParams is the input json body params of ResetTS
type ResetTSParams struct {
TSO string `json:"tso"`
Expand Down
Loading

0 comments on commit d19578d

Please sign in to comment.