Skip to content

Commit

Permalink
Merge branch 'master' into fix-missing-error
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 2, 2024
2 parents 18fad8b + b3bccce commit 136fc44
Show file tree
Hide file tree
Showing 66 changed files with 850 additions and 276 deletions.
11 changes: 8 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ linters-settings:
severity: warning
disabled: false
exclude: [""]
errcheck:
exclude-functions:
- (*github.com/unrolled/render.Render).JSON
- (*github.com/unrolled/render.Render).Data
- (*github.com/unrolled/render.Render).Text
- (net/http.ResponseWriter).Write
- github.com/pingcap/log.Sync
- (github.com/tikv/pd/pkg/ratelimit.Runner).RunTask
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
Expand All @@ -207,6 +215,3 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/syncer/server.go)
linters:
- errcheck
8 changes: 6 additions & 2 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ func start(cmd *cobra.Command, args []string, services ...string) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
flagSet.Parse(args)
err := cfg.Parse(flagSet)
err := flagSet.Parse(args)
if err != nil {
cmd.Println(err)
return
}
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
# key-path = ""
## A CN which must be provided by a client
# cert-allowed-cn = ["example.com"]
## Whether or not to enable redact log.
## Whether to enable the log redaction. It can be the following values:
## - false: disable redact log.
## - true: enable redact log, which will replace the sensitive information with "?".
## - "MARKER": enable redact log, which will use single guillemets ‹› to enclose the sensitive information.
# redact-info-log = false

[security.encryption]
Expand Down
8 changes: 4 additions & 4 deletions pkg/autoscaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ func NewHTTPHandler(svr *server.Server, rd *render.Render) *HTTPHandler {
func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rc := h.svr.GetRaftCluster()
if rc == nil {
_ = h.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
h.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
return
}
data, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
_ = h.rd.JSON(w, http.StatusInternalServerError, err.Error())
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

strategy := Strategy{}
if err := json.Unmarshal(data, &strategy); err != nil {
_ = h.rd.JSON(w, http.StatusBadRequest, err.Error())
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

plan := calculate(rc, h.svr.GetPDServerConfig(), &strategy)
_ = h.rd.JSON(w, http.StatusOK, plan)
h.rd.JSON(w, http.StatusOK, plan)
}
21 changes: 10 additions & 11 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
CollectFactor = 0.9
)

// errRegionIsStale is error info for region is stale.
Expand Down Expand Up @@ -746,7 +747,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
regionID := region.GetID()
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
_ = logRunner.RunTask(
logRunner.RunTask(
regionID,
"DebugLog",
func() {
Expand All @@ -755,7 +756,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
info = func(msg string, fields ...zap.Field) {
_ = logRunner.RunTask(
logRunner.RunTask(
regionID,
"InfoLog",
func() {
Expand Down Expand Up @@ -1583,6 +1584,12 @@ func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// IsStorePrepared checks if a store is prepared.
// For each store, the number of active regions should be more than total region of the store * CollectFactor
func (r *RegionsInfo) IsStorePrepared(storeID uint64) bool {
return float64(r.GetNotFromStorageRegionsCntByStore(storeID)) >= float64(r.GetStoreRegionCount(storeID))*CollectFactor
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
Expand Down Expand Up @@ -2042,14 +2049,6 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string {
return strings.Join(ret, ", ")
}

// String converts slice of bytes to string without copy.
func String(b []byte) string {
if len(b) == 0 {
return ""
}
return unsafe.String(unsafe.SliceData(b), len(b))
}

// ToUpperASCIIInplace bytes.ToUpper but zero-cost
func ToUpperASCIIInplace(s []byte) []byte {
hasLower := false
Expand Down Expand Up @@ -2088,7 +2087,7 @@ func HexRegionKey(key []byte) []byte {
// HexRegionKeyStr converts region key to hex format. Used for formatting region in
// logs.
func HexRegionKeyStr(key []byte) string {
return String(HexRegionKey(key))
return typeutil.BytesToString(HexRegionKey(key))
}

// RegionToHexMeta converts a region meta's keys to hex format. Used for formatting
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

func (s *Service) checkServing() error {
Expand Down
16 changes: 11 additions & 5 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type ServiceBuilder func(bs.Server) RegistrableService
// RegistrableService is the interface that should wraps the RegisterService method.
type RegistrableService interface {
RegisterGRPCService(g *grpc.Server)
RegisterRESTHandler(userDefineHandlers map[string]http.Handler)
RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error
}

// ServiceRegistry is a map that stores all registered grpc services.
Expand Down Expand Up @@ -82,14 +82,20 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http
for name, builder := range r.builders {
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
l.RegisterRESTHandler(h)
log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
}
continue
}
l := builder(srv)
r.services[serviceName] = l
l.RegisterRESTHandler(h)
log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error {
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()
if err := c.Security.Encryption.Adjust(); err != nil {
return err
}

c.Controller.Adjust(configMetaData.Child("controller"))
configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// GetManager returns the resource manager.
Expand Down Expand Up @@ -228,6 +228,8 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
log.Debug("finish token request from", zap.String("resource-group", resourceGroupName))
resps.Responses = append(resps.Responses, resp)
}
stream.Send(resps)
if err := stream.Send(resps); err != nil {
return errors.WithStack(err)
}
}
}
4 changes: 3 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,9 @@ func (m *Manager) persistResourceGroupRunningState() {
m.RLock()
group, ok := m.groups[keys[idx]]
if ok {
group.persistStates(m.storage)
if err := group.persistStates(m.storage); err != nil {
log.Error("persist resource group state failed", zap.Error(err))
}
}
m.RUnlock()
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ func (s *Server) campaignLeader() {

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
if err := cb(ctx); err != nil {
log.Error("failed to trigger the primary callback function", errs.ZapError(err))
}
}

s.participant.EnableLeader()
Expand Down Expand Up @@ -213,7 +215,9 @@ func (s *Server) Close() {
}

log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
if err := s.serviceRegister.Deregister(); err != nil {
log.Error("failed to deregister the service", errs.ZapError(err))
}
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
Expand Down Expand Up @@ -362,10 +366,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server {

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
cmd.Flags().Parse(args)
err := cmd.Flags().Parse(args)
if err != nil {
cmd.Println(err)
return
}
cfg := NewConfig()
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand All @@ -389,8 +397,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()

log.Sync()
versioninfo.Log(serviceName)
log.Info("resource manager config", zap.Reflect("config", cfg))

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*S
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()
log.Sync()

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
_ = ctx.TaskRunner.RunTask(
ctx.TaskRunner.RunTask(
regionID,
ratelimit.ObserveRegionStatsAsync,
func() {
Expand All @@ -636,7 +636,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
_ = ctx.TaskRunner.RunTask(
ctx.TaskRunner.RunTask(
regionID,
ratelimit.UpdateSubTree,
func() {
Expand All @@ -660,7 +660,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
tracer.OnSaveCacheFinished()
return err
}
_ = ctx.TaskRunner.RunTask(
ctx.TaskRunner.RunTask(
regionID,
ratelimit.UpdateSubTree,
func() {
Expand All @@ -669,7 +669,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
_ = ctx.TaskRunner.RunTask(
ctx.TaskRunner.RunTask(
regionID,
ratelimit.HandleOverlaps,
func() {
Expand All @@ -679,7 +679,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
tracer.OnSaveCacheFinished()
// handle region stats
_ = ctx.TaskRunner.RunTask(
ctx.TaskRunner.RunTask(
regionID,
ratelimit.CollectRegionStatsAsync,
func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ func (c *Config) adjust(meta *toml.MetaData) error {
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()
if err := c.Security.Encryption.Adjust(); err != nil {
return err
}

configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)

Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader {
Expand Down
14 changes: 10 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ func (s *Server) Close() {
}

log.Info("closing scheduling server ...")
s.serviceRegister.Deregister()
if err := s.serviceRegister.Deregister(); err != nil {
log.Error("failed to deregister the service", errs.ZapError(err))
}
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
Expand Down Expand Up @@ -563,10 +565,14 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server {
// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cmd.Flags().Parse(args)
err := cmd.Flags().Parse(args)
if err != nil {
cmd.Println(err)
return
}
cfg := config.NewConfig()
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand All @@ -590,7 +596,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()
log.Sync()

versioninfo.Log(serviceName)
log.Info("scheduling service config", zap.Reflect("config", cfg))
Expand Down
Loading

0 comments on commit 136fc44

Please sign in to comment.