diff --git a/.golangci.yml b/.golangci.yml index d4ce8edb65e..68cc82bd373 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -198,6 +198,14 @@ linters-settings: severity: warning disabled: false exclude: [""] + errcheck: + exclude-functions: + - (*github.com/unrolled/render.Render).JSON + - (*github.com/unrolled/render.Render).Data + - (*github.com/unrolled/render.Render).Text + - (net/http.ResponseWriter).Write + - github.com/pingcap/log.Sync + - (github.com/tikv/pd/pkg/ratelimit.Runner).RunTask issues: exclude-rules: - path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go) @@ -207,6 +215,3 @@ issues: - path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump) linters: - errcheck - - path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/syncer/server.go) - linters: - - errcheck diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index bd75309ed8a..553b93ed0ef 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -196,8 +196,12 @@ func start(cmd *cobra.Command, args []string, services ...string) { schedulers.Register() cfg := config.NewConfig() flagSet := cmd.Flags() - flagSet.Parse(args) - err := cfg.Parse(flagSet) + err := flagSet.Parse(args) + if err != nil { + cmd.Println(err) + return + } + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { diff --git a/conf/config.toml b/conf/config.toml index 20f664a4c85..438d2c857a5 100644 --- a/conf/config.toml +++ b/conf/config.toml @@ -33,7 +33,10 @@ # key-path = "" ## A CN which must be provided by a client # cert-allowed-cn = ["example.com"] -## Whether or not to enable redact log. +## Whether to enable the log redaction. It can be the following values: +## - false: disable redact log. +## - true: enable redact log, which will replace the sensitive information with "?". +## - "MARKER": enable redact log, which will use single guillemets ‹› to enclose the sensitive information. # redact-info-log = false [security.encryption] diff --git a/pkg/autoscaling/handler.go b/pkg/autoscaling/handler.go index 7bffa8ec156..ea248fdcc55 100644 --- a/pkg/autoscaling/handler.go +++ b/pkg/autoscaling/handler.go @@ -41,22 +41,22 @@ func NewHTTPHandler(svr *server.Server, rd *render.Render) *HTTPHandler { func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { rc := h.svr.GetRaftCluster() if rc == nil { - _ = h.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error()) return } data, err := io.ReadAll(r.Body) r.Body.Close() if err != nil { - _ = h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } strategy := Strategy{} if err := json.Unmarshal(data, &strategy); err != nil { - _ = h.rd.JSON(w, http.StatusBadRequest, err.Error()) + h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } plan := calculate(rc, h.svr.GetPDServerConfig(), &strategy) - _ = h.rd.JSON(w, http.StatusOK, plan) + h.rd.JSON(w, http.StatusOK, plan) } diff --git a/pkg/core/region.go b/pkg/core/region.go index 851648b8fce..73f2fdd62e7 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -44,6 +44,7 @@ import ( const ( randomRegionMaxRetry = 10 scanRegionLimit = 1000 + CollectFactor = 0.9 ) // errRegionIsStale is error info for region is stale. @@ -746,7 +747,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { regionID := region.GetID() if logRunner != nil { debug = func(msg string, fields ...zap.Field) { - _ = logRunner.RunTask( + logRunner.RunTask( regionID, "DebugLog", func() { @@ -755,7 +756,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } info = func(msg string, fields ...zap.Field) { - _ = logRunner.RunTask( + logRunner.RunTask( regionID, "InfoLog", func() { @@ -1583,6 +1584,12 @@ func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int { return r.getNotFromStorageRegionsCntByStoreLocked(storeID) } +// IsStorePrepared checks if a store is prepared. +// For each store, the number of active regions should be more than total region of the store * CollectFactor +func (r *RegionsInfo) IsStorePrepared(storeID uint64) bool { + return float64(r.GetNotFromStorageRegionsCntByStore(storeID)) >= float64(r.GetStoreRegionCount(storeID))*CollectFactor +} + // getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int { return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount() @@ -2042,14 +2049,6 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string { return strings.Join(ret, ", ") } -// String converts slice of bytes to string without copy. -func String(b []byte) string { - if len(b) == 0 { - return "" - } - return unsafe.String(unsafe.SliceData(b), len(b)) -} - // ToUpperASCIIInplace bytes.ToUpper but zero-cost func ToUpperASCIIInplace(s []byte) []byte { hasLower := false @@ -2088,7 +2087,7 @@ func HexRegionKey(key []byte) []byte { // HexRegionKeyStr converts region key to hex format. Used for formatting region in // logs. func HexRegionKeyStr(key []byte) string { - return String(HexRegionKey(key)) + return typeutil.BytesToString(HexRegionKey(key)) } // RegionToHexMeta converts a region meta's keys to hex format. Used for formatting diff --git a/pkg/mcs/metastorage/server/grpc_service.go b/pkg/mcs/metastorage/server/grpc_service.go index f018dc72f9f..c3ecfa572a1 100644 --- a/pkg/mcs/metastorage/server/grpc_service.go +++ b/pkg/mcs/metastorage/server/grpc_service.go @@ -71,9 +71,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } func (s *Service) checkServing() error { diff --git a/pkg/mcs/registry/registry.go b/pkg/mcs/registry/registry.go index 02e49f63d75..c6470b34dc4 100644 --- a/pkg/mcs/registry/registry.go +++ b/pkg/mcs/registry/registry.go @@ -37,7 +37,7 @@ type ServiceBuilder func(bs.Server) RegistrableService // RegistrableService is the interface that should wraps the RegisterService method. type RegistrableService interface { RegisterGRPCService(g *grpc.Server) - RegisterRESTHandler(userDefineHandlers map[string]http.Handler) + RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error } // ServiceRegistry is a map that stores all registered grpc services. @@ -82,14 +82,20 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http for name, builder := range r.builders { serviceName := createServiceName(prefix, name) if l, ok := r.services[serviceName]; ok { - l.RegisterRESTHandler(h) - log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) + if err := l.RegisterRESTHandler(h); err != nil { + log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + } else { + log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) + } continue } l := builder(srv) r.services[serviceName] = l - l.RegisterRESTHandler(h) - log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) + if err := l.RegisterRESTHandler(h); err != nil { + log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + } else { + log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) + } } } diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 70862ffb89c..83197b70f2e 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -237,7 +237,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error { } c.adjustLog(configMetaData.Child("log")) - c.Security.Encryption.Adjust() + if err := c.Security.Encryption.Adjust(); err != nil { + return err + } c.Controller.Adjust(configMetaData.Child("controller")) configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index 2f35042c48f..93243eb05be 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -76,9 +76,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } // GetManager returns the resource manager. @@ -228,6 +228,8 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu log.Debug("finish token request from", zap.String("resource-group", resourceGroupName)) resps.Responses = append(resps.Responses, resp) } - stream.Send(resps) + if err := stream.Send(resps); err != nil { + return errors.WithStack(err) + } } } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 418d188823f..13e46ea0bba 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -349,7 +349,9 @@ func (m *Manager) persistResourceGroupRunningState() { m.RLock() group, ok := m.groups[keys[idx]] if ok { - group.persistStates(m.storage) + if err := group.persistStates(m.storage); err != nil { + log.Error("persist resource group state failed", zap.Error(err)) + } } m.RUnlock() } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 1fac592f791..708a11344d4 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -180,7 +180,9 @@ func (s *Server) campaignLeader() { log.Info("triggering the primary callback functions") for _, cb := range s.primaryCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to trigger the primary callback function", errs.ZapError(err)) + } } s.participant.EnableLeader() @@ -213,7 +215,9 @@ func (s *Server) Close() { } log.Info("closing resource manager server ...") - s.serviceRegister.Deregister() + if err := s.serviceRegister.Deregister(); err != nil { + log.Error("failed to deregister the service", errs.ZapError(err)) + } utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() @@ -362,10 +366,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { - cmd.Flags().Parse(args) + err := cmd.Flags().Parse(args) + if err != nil { + cmd.Println(err) + return + } cfg := NewConfig() flagSet := cmd.Flags() - err := cfg.Parse(flagSet) + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -389,8 +397,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() - + log.Sync() versioninfo.Log(serviceName) log.Info("resource manager config", zap.Reflect("config", cfg)) diff --git a/pkg/mcs/resourcemanager/server/testutil.go b/pkg/mcs/resourcemanager/server/testutil.go index 3de0e32c0ab..0277e5e8a8f 100644 --- a/pkg/mcs/resourcemanager/server/testutil.go +++ b/pkg/mcs/resourcemanager/server/testutil.go @@ -32,7 +32,7 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*S re.NoError(err) log.ReplaceGlobals(cfg.Logger, cfg.LogProps) // Flushing any buffered log entries - defer log.Sync() + log.Sync() s := CreateServer(ctx, cfg) if err = s.Run(); err != nil { diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 4062ed38fd6..b18db7c0798 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -624,7 +624,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.ObserveRegionStatsAsync, func() { @@ -636,7 +636,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c } // region is not updated to the subtree. if origin.GetRef() < 2 { - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.UpdateSubTree, func() { @@ -660,7 +660,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnSaveCacheFinished() return err } - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.UpdateSubTree, func() { @@ -669,7 +669,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.HandleOverlaps, func() { @@ -679,7 +679,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c } tracer.OnSaveCacheFinished() // handle region stats - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.CollectRegionStatsAsync, func() { diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 9dc6590a0b4..2111aa3ddcc 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -146,7 +146,9 @@ func (c *Config) adjust(meta *toml.MetaData) error { } c.adjustLog(configMetaData.Child("log")) - c.Security.Encryption.Adjust() + if err := c.Security.Encryption.Adjust(); err != nil { + return err + } configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 842e876885c..77d9a722d23 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -336,9 +336,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader { diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 47a7cf9962b..8eb9e49d964 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -316,7 +316,9 @@ func (s *Server) Close() { } log.Info("closing scheduling server ...") - s.serviceRegister.Deregister() + if err := s.serviceRegister.Deregister(); err != nil { + log.Error("failed to deregister the service", errs.ZapError(err)) + } utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() @@ -563,10 +565,14 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { schedulers.Register() - cmd.Flags().Parse(args) + err := cmd.Flags().Parse(args) + if err != nil { + cmd.Println(err) + return + } cfg := config.NewConfig() flagSet := cmd.Flags() - err := cfg.Parse(flagSet) + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -590,7 +596,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() + log.Sync() versioninfo.Log(serviceName) log.Info("scheduling service config", zap.Reflect("config", cfg)) diff --git a/pkg/mcs/scheduling/server/testutil.go b/pkg/mcs/scheduling/server/testutil.go index 74baac44808..aba88945434 100644 --- a/pkg/mcs/scheduling/server/testutil.go +++ b/pkg/mcs/scheduling/server/testutil.go @@ -33,8 +33,7 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf re.NoError(err) log.ReplaceGlobals(cfg.Logger, cfg.LogProps) // Flushing any buffered log entries - defer log.Sync() - + log.Sync() s := CreateServer(ctx, cfg) if err = s.Run(); err != nil { return nil, nil, err diff --git a/pkg/mcs/tso/server/config.go b/pkg/mcs/tso/server/config.go index 06e9054e117..8cfef98ebaf 100644 --- a/pkg/mcs/tso/server/config.go +++ b/pkg/mcs/tso/server/config.go @@ -224,9 +224,7 @@ func (c *Config) Adjust(meta *toml.MetaData) error { } c.adjustLog(configMetaData.Child("log")) - c.Security.Encryption.Adjust() - - return nil + return c.Security.Encryption.Adjust() } func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 0a075a45c05..7dd9c6b5605 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -78,9 +78,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } // Tso returns a stream of timestamps diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c38c7142730..60ce2917ed5 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -174,7 +174,9 @@ func (s *Server) Close() { log.Info("closing tso server ...") // close tso service loops in the keyspace group manager s.keyspaceGroupManager.Close() - s.serviceRegister.Deregister() + if err := s.serviceRegister.Deregister(); err != nil { + log.Error("failed to deregister the service", errs.ZapError(err)) + } utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() @@ -435,10 +437,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { - cmd.Flags().Parse(args) + err := cmd.Flags().Parse(args) + if err != nil { + cmd.Println(err) + return + } cfg := NewConfig() flagSet := cmd.Flags() - err := cfg.Parse(flagSet) + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -462,7 +468,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() + log.Sync() versioninfo.Log(serviceName) log.Info("TSO service config", zap.Reflect("config", cfg)) diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index b70b050617e..fb78f0b4be3 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -324,6 +324,6 @@ func StopGRPCServer(s server) { // Exit exits the program with the given code. func Exit(code int) { - _ = log.Sync() + log.Sync() os.Exit(code) } diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 5f6b212529b..fd55874ce30 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -468,31 +468,31 @@ func (m *ModeManager) tickUpdateState() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(storeIDs[primaryUp]) + _ = m.drSwitchToAsyncWait(storeIDs[primaryUp]) } case drStateAsyncWait: if canSync { - m.drSwitchToSync() + _ = m.drSwitchToSync() break } if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) { - m.drSwitchToAsyncWait(storeIDs[primaryUp]) + _ = m.drSwitchToAsyncWait(storeIDs[primaryUp]) break } if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { - m.drSwitchToAsync(storeIDs[primaryUp]) + _ = m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration { - m.drSwitchToSyncRecover() + _ = m.drSwitchToSyncRecover() break } if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { - m.drSwitchToAsync(storeIDs[primaryUp]) + _ = m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(storeIDs[primaryUp]) + _ = m.drSwitchToAsync(storeIDs[primaryUp]) } else { m.updateProgress() progress := m.estimateProgress() @@ -500,7 +500,7 @@ func (m *ModeManager) tickUpdateState() { drRecoverProgressGauge.Set(float64(progress)) if progress == 1.0 { - m.drSwitchToSync() + _ = m.drSwitchToSync() } else { m.updateRecoverProgress(progress) } diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index d19a4f70d66..bbaada98924 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -87,8 +87,7 @@ func TestStatus(t *testing.T) { }, }, rep.GetReplicationStatus()) - err = rep.drSwitchToAsync(nil) - re.NoError(err) + re.NoError(rep.drSwitchToAsync(nil)) re.Equal(&pb.ReplicationStatus{ Mode: pb.ReplicationMode_DR_AUTO_SYNC, DrAutoSync: &pb.DRAutoSync{ @@ -99,8 +98,7 @@ func TestStatus(t *testing.T) { }, }, rep.GetReplicationStatus()) - err = rep.drSwitchToSyncRecover() - re.NoError(err) + re.NoError(rep.drSwitchToSyncRecover()) stateID := rep.drAutoSync.StateID re.Equal(&pb.ReplicationStatus{ Mode: pb.ReplicationMode_DR_AUTO_SYNC, @@ -327,7 +325,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() - rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) + re.NoError(rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5})) rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour) rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout @@ -346,14 +344,14 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) assertStateIDUpdate() setStoreState(cluster, "down", "down", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) // sync_recover -> sync - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) assertStateIDUpdate() setStoreState(cluster, "up", "up", "up", "up", "up", "up") cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5) @@ -500,7 +498,7 @@ func TestRecoverProgress(t *testing.T) { re.NoError(err) prepare := func(n int, asyncRegions []int) { - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) regions := genRegions(cluster, rep.drAutoSync.StateID, n) for _, i := range asyncRegions { regions[i] = regions[i].Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ @@ -565,7 +563,7 @@ func TestRecoverProgressWithSplitAndMerge(t *testing.T) { re.NoError(err) prepare := func(n int, asyncRegions []int) { - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) regions := genRegions(cluster, rep.drAutoSync.StateID, n) for _, i := range asyncRegions { regions[i] = regions[i].Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index fb22303f0b7..55913a13341 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -46,7 +46,6 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second checkSuspectRangesInterval = 100 * time.Millisecond - collectFactor = 0.9 collectTimeout = 5 * time.Minute maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 126e3bba41d..df7074b9073 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -51,8 +51,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() totalRegionsCnt := c.GetTotalRegionCount() - // The number of active regions should be more than total region of all stores * collectFactor - if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) { + // The number of active regions should be more than total region of all stores * core.CollectFactor + if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) { return false } for _, store := range c.GetStores() { @@ -61,11 +61,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } storeID := store.GetID() // It is used to avoid sudden scheduling when scheduling service is just started. - if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { + if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { return false } - // For each store, the number of active regions should be more than total region of the store * collectFactor - if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { + if !c.IsStorePrepared(storeID) { return false } } diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 24471e1980d..c4b9cd6ab5e 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -86,10 +86,14 @@ func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, any) { newConfig, _ := json.Marshal(conf) if !bytes.Equal(oldConfig, newConfig) { if !conf.validateLocked() { - json.Unmarshal(oldConfig, conf) + if err := json.Unmarshal(oldConfig, conf); err != nil { + return http.StatusInternalServerError, err.Error() + } return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } - conf.persistLocked() + if err := conf.persistLocked(); err != nil { + log.Warn("failed to save balance-leader-scheduler config", errs.ZapError(err)) + } log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig)) return http.StatusOK, "Config is updated." } @@ -161,12 +165,12 @@ func (handler *balanceLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http data, _ := io.ReadAll(r.Body) r.Body.Close() httpCode, v := handler.config.Update(data) - _ = handler.rd.JSON(w, httpCode, v) + handler.rd.JSON(w, httpCode, v) } func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } type balanceLeaderScheduler struct { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 0cf69d67e28..0225d948815 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -151,12 +151,12 @@ func (handler *balanceWitnessHandler) UpdateConfig(w http.ResponseWriter, r *htt data, _ := io.ReadAll(r.Body) r.Body.Close() httpCode, v := handler.config.Update(data) - _ = handler.rd.JSON(w, httpCode, v) + handler.rd.JSON(w, httpCode, v) } func (handler *balanceWitnessHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } type balanceWitnessScheduler struct { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 63ca6013584..8f56643f384 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -373,7 +373,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R if _, exists = handler.config.StoreIDWithRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { handler.config.RUnlock() - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } @@ -390,28 +390,28 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R err := handler.config.BuildWithArgs(args) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } err = handler.config.Persist() if err != nil { handler.config.removeStore(id) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - _ = handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } @@ -422,26 +422,26 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R err = handler.config.Persist() if err != nil { handler.config.resetStore(id, keyRanges) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } if last { if err := handler.config.removeSchedulerCb(EvictLeaderName); err != nil { if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { - _ = handler.rd.JSON(w, http.StatusNotFound, err.Error()) + handler.rd.JSON(w, http.StatusNotFound, err.Error()) } else { handler.config.resetStore(id, keyRanges) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) } return } resp = lastStoreDeleteInfo } - _ = handler.rd.JSON(w, http.StatusOK, resp) + handler.rd.JSON(w, http.StatusOK, resp) return } - _ = handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error()) + handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error()) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 8989cd5de3f..9b13e292c87 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -160,7 +160,7 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt } recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) if !ok { - _ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) return } handler.config.Lock() @@ -169,17 +169,17 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt recoveryDurationGap := uint64(recoveryDurationGapFloat) handler.config.RecoveryDurationGap = recoveryDurationGap if err := handler.config.persistLocked(); err != nil { - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.RecoveryDurationGap = prevRecoveryDurationGap return } log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) - _ = handler.rd.JSON(w, http.StatusOK, "Config updated.") + handler.rd.JSON(w, http.StatusOK, "Config updated.") } func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } type evictSlowStoreScheduler struct { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 393a48aa282..da3dbc24e95 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -246,7 +246,7 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt } recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) if !ok { - _ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) return } handler.config.Lock() @@ -255,17 +255,17 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt recoveryDurationGap := uint64(recoveryDurationGapFloat) handler.config.RecoveryDurationGap = recoveryDurationGap if err := handler.config.persistLocked(); err != nil { - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.RecoveryDurationGap = prevRecoveryDurationGap return } log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) - _ = handler.rd.JSON(w, http.StatusOK, "Config updated.") + handler.rd.JSON(w, http.StatusOK, "Config updated.") } func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } type evictSlowTrendScheduler struct { diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 56ed7cd730e..8dac759793c 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -210,39 +210,39 @@ func (handler *grantHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *htt } ids, ok := input["store-id"].(string) if !ok { - _ = handler.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerConfig) + handler.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerConfig) return } storeIDs := make([]uint64, 0) for _, v := range strings.Split(ids, ",") { id, err := strconv.ParseUint(v, 10, 64) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, errs.ErrBytesToUint64) + handler.rd.JSON(w, http.StatusBadRequest, errs.ErrBytesToUint64) return } storeIDs = append(storeIDs, id) } leaderID, err := strconv.ParseUint(input["store-leader-id"].(string), 10, 64) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, errs.ErrBytesToUint64) + handler.rd.JSON(w, http.StatusBadRequest, errs.ErrBytesToUint64) return } if !handler.config.setStore(leaderID, storeIDs) { - _ = handler.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerConfig) + handler.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerConfig) return } if err = handler.config.Persist(); err != nil { handler.config.SetStoreLeaderID(0) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - _ = handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, nil) } func (handler *grantHotRegionHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index ad0b1a09b79..4752ef3e61d 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -284,7 +284,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R if _, exists = handler.config.StoreIDWithRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { handler.config.RUnlock() - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } @@ -301,28 +301,28 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R err := handler.config.BuildWithArgs(args) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } err = handler.config.Persist() if err != nil { handler.config.removeStore(id) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - _ = handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } @@ -333,26 +333,26 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R err = handler.config.Persist() if err != nil { handler.config.resetStore(id, keyRanges) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } if last { if err := handler.config.removeSchedulerCb(GrantLeaderName); err != nil { if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { - _ = handler.rd.JSON(w, http.StatusNotFound, err.Error()) + handler.rd.JSON(w, http.StatusNotFound, err.Error()) } else { handler.config.resetStore(id, keyRanges) - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) } return } resp = lastStoreDeleteInfo } - _ = handler.rd.JSON(w, http.StatusOK, resp) + handler.rd.JSON(w, http.StatusOK, resp) return } - _ = handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error()) + handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error()) } func newGrantLeaderHandler(config *grantLeaderSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index d71e5e984bd..517edb1d637 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -379,7 +379,7 @@ func (conf *hotRegionSchedulerConfig) handleGetConfig(w http.ResponseWriter, _ * conf.RLock() defer conf.RUnlock() rd := render.New(render.Options{IndentJSON: true}) - _ = rd.JSON(w, http.StatusOK, conf.getValidConf()) + rd.JSON(w, http.StatusOK, conf.getValidConf()) } func isPriorityValid(priorities []string) (map[string]bool, error) { @@ -434,20 +434,20 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * data, err := io.ReadAll(r.Body) r.Body.Close() if err != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) return } if err := json.Unmarshal(data, conf); err != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) return } if err := conf.validateLocked(); err != nil { // revert to old version if err2 := json.Unmarshal(oldc, conf); err2 != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err2.Error()) + rd.JSON(w, http.StatusInternalServerError, err2.Error()) } else { - _ = rd.JSON(w, http.StatusBadRequest, err.Error()) + rd.JSON(w, http.StatusBadRequest, err.Error()) } return } @@ -457,22 +457,22 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * log.Warn("failed to persist config", zap.Error(err)) } log.Info("hot-region-scheduler config is updated", zap.String("old", string(oldc)), zap.String("new", string(newc))) - _ = rd.Text(w, http.StatusOK, "Config is updated.") + rd.Text(w, http.StatusOK, "Config is updated.") return } m := make(map[string]any) if err := json.Unmarshal(data, &m); err != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) return } ok := reflectutil.FindSameFieldByJSON(conf, m) if ok { - _ = rd.Text(w, http.StatusOK, "Config is the same with origin, so do nothing.") + rd.Text(w, http.StatusOK, "Config is the same with origin, so do nothing.") return } - _ = rd.Text(w, http.StatusBadRequest, "Config item is not found.") + rd.Text(w, http.StatusBadRequest, "Config item is not found.") } func (conf *hotRegionSchedulerConfig) persistLocked() error { diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index a7fadf703eb..ebee66dc207 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -255,7 +255,7 @@ func (handler *scatterRangeHandler) UpdateConfig(w http.ResponseWriter, r *http. name, ok := input["range-name"].(string) if ok { if name != handler.config.GetRangeName() { - _ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("Cannot change the range name, please delete this schedule").Error()) + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("Cannot change the range name, please delete this schedule").Error()) return } args = append(args, name) @@ -278,19 +278,19 @@ func (handler *scatterRangeHandler) UpdateConfig(w http.ResponseWriter, r *http. } err := handler.config.BuildWithArgs(args) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } err = handler.config.Persist() if err != nil { - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) } - _ = handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, nil) } func (handler *scatterRangeHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } func newScatterRangeHandler(config *scatterRangeSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 0b9021267cb..726138e8f7a 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -234,7 +234,7 @@ func (handler *shuffleHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *h } limit, ok := input["limit"].(float64) if !ok { - _ = handler.rd.JSON(w, http.StatusBadRequest, "invalid limit") + handler.rd.JSON(w, http.StatusBadRequest, "invalid limit") return } handler.config.Lock() @@ -243,16 +243,16 @@ func (handler *shuffleHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *h handler.config.Limit = uint64(limit) err := handler.config.persistLocked() if err != nil { - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.Limit = previous return } - _ = handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, nil) } func (handler *shuffleHotRegionHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } func newShuffleHotRegionHandler(config *shuffleHotRegionSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index 6a4a698aa5b..bce64f743b8 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -79,7 +79,7 @@ func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *ht func (conf *shuffleRegionSchedulerConfig) handleGetRoles(w http.ResponseWriter, _ *http.Request) { rd := render.New(render.Options{IndentJSON: true}) - _ = rd.JSON(w, http.StatusOK, conf.GetRoles()) + rd.JSON(w, http.StatusOK, conf.GetRoles()) } func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, r *http.Request) { @@ -90,7 +90,7 @@ func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, } for _, r := range roles { if slice.NoneOf(allRoles, func(i int) bool { return allRoles[i] == r }) { - _ = rd.Text(w, http.StatusBadRequest, "invalid role:"+r) + rd.Text(w, http.StatusBadRequest, "invalid role:"+r) return } } @@ -101,10 +101,10 @@ func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, conf.Roles = roles if err := conf.persist(); err != nil { conf.Roles = old // revert - _ = rd.Text(w, http.StatusInternalServerError, err.Error()) + rd.Text(w, http.StatusInternalServerError, err.Error()) return } - _ = rd.Text(w, http.StatusOK, "Config is updated.") + rd.Text(w, http.StatusOK, "Config is updated.") } func (conf *shuffleRegionSchedulerConfig) persist() error { diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 2a22d695953..9b049bf6ba1 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -23,6 +23,8 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -112,7 +114,7 @@ type splitBucketHandler struct { func (h *splitBucketHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := h.conf.Clone() - _ = h.rd.JSON(w, http.StatusOK, conf) + h.rd.JSON(w, http.StatusOK, conf) } func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { @@ -123,33 +125,35 @@ func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request data, err := io.ReadAll(r.Body) defer r.Body.Close() if err != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) return } if err := json.Unmarshal(data, h.conf); err != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) return } newc, _ := json.Marshal(h.conf) if !bytes.Equal(oldc, newc) { - h.conf.persistLocked() - _ = rd.Text(w, http.StatusOK, "Config is updated.") + if err := h.conf.persistLocked(); err != nil { + log.Warn("failed to save config", errs.ZapError(err)) + } + rd.Text(w, http.StatusOK, "Config is updated.") return } m := make(map[string]any) if err := json.Unmarshal(data, &m); err != nil { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) return } ok := reflectutil.FindSameFieldByJSON(h.conf, m) if ok { - _ = rd.Text(w, http.StatusOK, "Config is the same with origin, so do nothing.") + rd.Text(w, http.StatusOK, "Config is the same with origin, so do nothing.") return } - _ = rd.Text(w, http.StatusBadRequest, "Config item is not found.") + rd.Text(w, http.StatusBadRequest, "Config item is not found.") } func newSplitBucketHandler(conf *splitBucketSchedulerConfig) http.Handler { diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index c2f09980651..85b29e0b47e 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -100,7 +100,9 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf } if ssp.ExpiredAt < now.Unix() { - se.Remove(key) + if err := se.Remove(key); err != nil { + log.Error("failed to remove expired service safepoint", errs.ZapError(err)) + } continue } if ssp.SafePoint < min.SafePoint { diff --git a/pkg/storage/hot_region_storage.go b/pkg/storage/hot_region_storage.go index c08825dbba1..50fa7455f44 100644 --- a/pkg/storage/hot_region_storage.go +++ b/pkg/storage/hot_region_storage.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -266,8 +267,8 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR if err != nil { return err } - historyHotRegions[i].StartKey = core.String(region.StartKey) - historyHotRegions[i].EndKey = core.String(region.EndKey) + historyHotRegions[i].StartKey = typeutil.BytesToString(region.StartKey) + historyHotRegions[i].EndKey = typeutil.BytesToString(region.EndKey) key := HotRegionStorePath(hotRegionType, historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID) h.batchHotInfo[key] = &historyHotRegions[i] } @@ -385,8 +386,8 @@ func (it *HotRegionStorageIterator) Next() (*HistoryHotRegion, error) { if err := encryption.DecryptRegion(region, it.encryptionKeyManager); err != nil { return nil, err } - message.StartKey = core.String(region.StartKey) - message.EndKey = core.String(region.EndKey) + message.StartKey = typeutil.BytesToString(region.StartKey) + message.EndKey = typeutil.BytesToString(region.EndKey) message.EncryptionMeta = nil return &message, nil } diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index ccc32b13303..132b06aec69 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -282,7 +282,10 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync RegionLeaders: leaders, Buckets: buckets, } - s.limit.WaitN(ctx, resp.Size()) + if err := s.limit.WaitN(ctx, resp.Size()); err != nil { + log.Error("failed to wait rate limit", errs.ZapError(err)) + return err + } lastIndex += len(metas) if err := stream.Send(resp); err != nil { log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) diff --git a/pkg/utils/apiutil/apiutil.go b/pkg/utils/apiutil/apiutil.go index 2503ba9aecf..20465d8376c 100644 --- a/pkg/utils/apiutil/apiutil.go +++ b/pkg/utils/apiutil/apiutil.go @@ -116,14 +116,14 @@ func TagJSONError(err error) error { func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { if err == nil { log.Error("nil is given to errorResp") - _ = rd.JSON(w, http.StatusInternalServerError, "nil error") + rd.JSON(w, http.StatusInternalServerError, "nil error") return } if errCode := errcode.CodeChain(err); errCode != nil { w.Header().Set("TiDB-Error-Code", errCode.Code().CodeStr().String()) - _ = rd.JSON(w, errCode.Code().HTTPCode(), errcode.NewJSONFormat(errCode)) + rd.JSON(w, errCode.Code().HTTPCode(), errcode.NewJSONFormat(errCode)) } else { - _ = rd.JSON(w, http.StatusInternalServerError, err.Error()) + rd.JSON(w, http.StatusInternalServerError, err.Error()) } } diff --git a/pkg/utils/configutil/configutil.go b/pkg/utils/configutil/configutil.go index 086f74ff842..48be7ff8c02 100644 --- a/pkg/utils/configutil/configutil.go +++ b/pkg/utils/configutil/configutil.go @@ -25,6 +25,7 @@ import ( "github.com/spf13/pflag" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -78,9 +79,12 @@ func (m *ConfigMetaData) CheckUndecoded() error { // SecurityConfig indicates the security configuration type SecurityConfig struct { grpcutil.TLSConfig - // RedactInfoLog indicates that whether enabling redact log - RedactInfoLog bool `toml:"redact-info-log" json:"redact-info-log"` - Encryption encryption.Config `toml:"encryption" json:"encryption"` + // RedactInfoLog indicates that whether to enable the log redaction. It can be the following values: + // - false: disable redact log. + // - true: enable redact log, which will replace the sensitive information with "?". + // - "MARKER": enable redact log, which will use single guillemets ‹› to enclose the sensitive information. + RedactInfoLog logutil.RedactInfoLogType `toml:"redact-info-log" json:"redact-info-log"` + Encryption encryption.Config `toml:"encryption" json:"encryption"` } // PrintConfigCheckMsg prints the message about configuration checks. diff --git a/pkg/utils/logutil/log.go b/pkg/utils/logutil/log.go index ff6ffa7af9a..c7a9ac2f3b7 100644 --- a/pkg/utils/logutil/log.go +++ b/pkg/utils/logutil/log.go @@ -15,12 +15,15 @@ package logutil import ( + "encoding/json" + "errors" "fmt" "strings" "sync/atomic" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -67,16 +70,19 @@ func StringToZapLogLevel(level string) zapcore.Level { } // SetupLogger setup the logger. -func SetupLogger(logConfig log.Config, logger **zap.Logger, logProps **log.ZapProperties, enabled ...bool) error { +func SetupLogger( + logConfig log.Config, + logger **zap.Logger, + logProps **log.ZapProperties, + redactInfoLogType RedactInfoLogType, +) error { lg, p, err := log.InitLogger(&logConfig, zap.AddStacktrace(zapcore.FatalLevel)) if err != nil { return errs.ErrInitLogger.Wrap(err) } *logger = lg *logProps = p - if len(enabled) > 0 { - SetRedactLog(enabled[0]) - } + setRedactType(redactInfoLogType) return nil } @@ -88,22 +94,111 @@ func LogPanic() { } } +// RedactInfoLogType is the behavior of redacting sensitive information in logs. +type RedactInfoLogType int + +const ( + // RedactInfoLogOFF means log redaction is disabled. + RedactInfoLogOFF RedactInfoLogType = iota + // RedactInfoLogON means log redaction is enabled, and will replace the sensitive information with "?". + RedactInfoLogON + // RedactInfoLogMarker means log redaction is enabled, and will use single guillemets ‹› to enclose the sensitive information. + RedactInfoLogMarker +) + +// MarshalJSON implements the `json.Marshaler` interface to ensure the compatibility. +func (t RedactInfoLogType) MarshalJSON() ([]byte, error) { + switch t { + case RedactInfoLogON: + return json.Marshal(true) + case RedactInfoLogMarker: + return json.Marshal("MARKER") + default: + } + return json.Marshal(false) +} + +const invalidRedactInfoLogTypeErrMsg = `the "redact-info-log" value is invalid; it should be either false, true, or "MARKER"` + +// UnmarshalJSON implements the `json.Marshaler` interface to ensure the compatibility. +func (t *RedactInfoLogType) UnmarshalJSON(data []byte) error { + var s string + err := json.Unmarshal(data, &s) + if err == nil && strings.ToUpper(s) == "MARKER" { + *t = RedactInfoLogMarker + return nil + } + var b bool + err = json.Unmarshal(data, &b) + if err != nil { + return errors.New(invalidRedactInfoLogTypeErrMsg) + } + if b { + *t = RedactInfoLogON + } else { + *t = RedactInfoLogOFF + } + return nil +} + +// UnmarshalTOML implements the `toml.Unmarshaler` interface to ensure the compatibility. +func (t *RedactInfoLogType) UnmarshalTOML(data any) error { + switch v := data.(type) { + case bool: + if v { + *t = RedactInfoLogON + } else { + *t = RedactInfoLogOFF + } + return nil + case string: + if strings.ToUpper(v) == "MARKER" { + *t = RedactInfoLogMarker + return nil + } + return errors.New(invalidRedactInfoLogTypeErrMsg) + default: + } + return errors.New(invalidRedactInfoLogTypeErrMsg) +} + var ( - enabledRedactLog atomic.Value + curRedactType atomic.Value ) func init() { - SetRedactLog(false) + setRedactType(RedactInfoLogOFF) +} + +func getRedactType() RedactInfoLogType { + return curRedactType.Load().(RedactInfoLogType) } -// IsRedactLogEnabled indicates whether the log desensitization is enabled -func IsRedactLogEnabled() bool { - return enabledRedactLog.Load().(bool) +func setRedactType(redactInfoLogType RedactInfoLogType) { + curRedactType.Store(redactInfoLogType) } -// SetRedactLog sets enabledRedactLog -func SetRedactLog(enabled bool) { - enabledRedactLog.Store(enabled) +const ( + leftMark = '‹' + rightMark = '›' +) + +func redactInfo(input string) string { + res := &strings.Builder{} + res.Grow(len(input) + 2) + _, _ = res.WriteRune(leftMark) + for _, c := range input { + // Double the mark character if it is already in the input string. + // to avoid the ambiguity of the redacted content. + if c == leftMark || c == rightMark { + _, _ = res.WriteRune(c) + _, _ = res.WriteRune(c) + } else { + _, _ = res.WriteRune(c) + } + } + _, _ = res.WriteRune(rightMark) + return res.String() } // ZapRedactByteString receives []byte argument and return omitted information zap.Field if redact log enabled @@ -123,34 +218,48 @@ func ZapRedactStringer(key string, arg fmt.Stringer) zap.Field { // RedactBytes receives []byte argument and return omitted information if redact log enabled func RedactBytes(arg []byte) []byte { - if IsRedactLogEnabled() { + switch getRedactType() { + case RedactInfoLogON: return []byte("?") + case RedactInfoLogMarker: + // Use unsafe conversion to avoid copy. + return typeutil.StringToBytes(redactInfo(typeutil.BytesToString(arg))) + default: } return arg } // RedactString receives string argument and return omitted information if redact log enabled func RedactString(arg string) string { - if IsRedactLogEnabled() { + switch getRedactType() { + case RedactInfoLogON: return "?" + case RedactInfoLogMarker: + return redactInfo(arg) + default: } return arg } // RedactStringer receives stringer argument and return omitted information if redact log enabled func RedactStringer(arg fmt.Stringer) fmt.Stringer { - if IsRedactLogEnabled() { - return stringer{} + switch getRedactType() { + case RedactInfoLogON: + return &redactedStringer{"?"} + case RedactInfoLogMarker: + return &redactedStringer{redactInfo(arg.String())} + default: } return arg } -type stringer struct { +type redactedStringer struct { + content string } // String implement fmt.Stringer -func (stringer) String() string { - return "?" +func (rs *redactedStringer) String() string { + return rs.content } // CondUint32 constructs a field with the given key and value conditionally. diff --git a/pkg/utils/logutil/log_test.go b/pkg/utils/logutil/log_test.go index 650ba62fe9d..ae0534bbc7a 100644 --- a/pkg/utils/logutil/log_test.go +++ b/pkg/utils/logutil/log_test.go @@ -15,9 +15,11 @@ package logutil import ( - "fmt" + "encoding/json" + "strings" "testing" + "github.com/BurntSushi/toml" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" ) @@ -33,52 +35,185 @@ func TestStringToZapLogLevel(t *testing.T) { re.Equal(zapcore.InfoLevel, StringToZapLogLevel("whatever")) } +func TestRedactInfoLogType(t *testing.T) { + re := require.New(t) + // JSON unmarshal. + jsonUnmarshalTestCases := []struct { + jsonStr string + expect RedactInfoLogType + expectErr bool + }{ + {`false`, RedactInfoLogOFF, false}, + {`true`, RedactInfoLogON, false}, + {`"MARKER"`, RedactInfoLogMarker, false}, + {`"marker"`, RedactInfoLogMarker, false}, + {`"OTHER"`, RedactInfoLogOFF, true}, + {`"OFF"`, RedactInfoLogOFF, true}, + {`"ON"`, RedactInfoLogOFF, true}, + {`"off"`, RedactInfoLogOFF, true}, + {`"on"`, RedactInfoLogOFF, true}, + {`""`, RedactInfoLogOFF, true}, + {`"fALSe"`, RedactInfoLogOFF, true}, + {`"trUE"`, RedactInfoLogOFF, true}, + } + var redactType RedactInfoLogType + for idx, tc := range jsonUnmarshalTestCases { + t.Logf("test case %d: %s", idx, tc.jsonStr) + err := json.Unmarshal([]byte(tc.jsonStr), &redactType) + if tc.expectErr { + re.Error(err) + re.ErrorContains(err, invalidRedactInfoLogTypeErrMsg) + } else { + re.NoError(err) + re.Equal(tc.expect, redactType) + } + } + // JSON marshal. + jsonMarshalTestCases := []struct { + typ RedactInfoLogType + expect string + }{ + {RedactInfoLogOFF, `false`}, + {RedactInfoLogON, `true`}, + {RedactInfoLogMarker, `"MARKER"`}, + } + for _, tc := range jsonMarshalTestCases { + b, err := json.Marshal(tc.typ) + re.NoError(err) + re.Equal(tc.expect, string(b)) + } + // TOML unmarshal. + tomlTestCases := []struct { + tomlStr string + expect RedactInfoLogType + expectErr bool + }{ + {`redact-info-log = false`, RedactInfoLogOFF, false}, + {`redact-info-log = true`, RedactInfoLogON, false}, + {`redact-info-log = "MARKER"`, RedactInfoLogMarker, false}, + {`redact-info-log = "marker"`, RedactInfoLogMarker, false}, + {`redact-info-log = "OTHER"`, RedactInfoLogOFF, true}, + {`redact-info-log = "OFF"`, RedactInfoLogOFF, true}, + {`redact-info-log = "ON"`, RedactInfoLogOFF, true}, + {`redact-info-log = "off"`, RedactInfoLogOFF, true}, + {`redact-info-log = "on"`, RedactInfoLogOFF, true}, + {`redact-info-log = ""`, RedactInfoLogOFF, true}, + {`redact-info-log = "fALSe"`, RedactInfoLogOFF, true}, + {`redact-info-log = "trUE"`, RedactInfoLogOFF, true}, + } + var config struct { + RedactInfoLog RedactInfoLogType `toml:"redact-info-log"` + } + for _, tc := range tomlTestCases { + _, err := toml.Decode(tc.tomlStr, &config) + if tc.expectErr { + re.Error(err) + re.ErrorContains(err, invalidRedactInfoLogTypeErrMsg) + } else { + re.NoError(err) + re.Equal(tc.expect, config.RedactInfoLog) + } + } +} + func TestRedactLog(t *testing.T) { re := require.New(t) testCases := []struct { - name string - arg any - enableRedactLog bool - expect any + name string + arg any + redactInfoLogType RedactInfoLogType + expect any }{ { - name: "string arg, enable redact", - arg: "foo", - enableRedactLog: true, - expect: "?", + name: "string arg, enable redact", + arg: "foo", + redactInfoLogType: RedactInfoLogON, + expect: "?", + }, + { + name: "string arg", + arg: "foo", + redactInfoLogType: RedactInfoLogOFF, + expect: "foo", + }, + { + name: "[]byte arg, enable redact", + arg: []byte("foo"), + redactInfoLogType: RedactInfoLogON, + expect: []byte("?"), + }, + { + name: "[]byte arg", + arg: []byte("foo"), + redactInfoLogType: RedactInfoLogOFF, + expect: []byte("foo"), + }, + { + name: "string arg, enable redact marker", + arg: "foo", + redactInfoLogType: RedactInfoLogMarker, + expect: "‹foo›", + }, + { + name: "string arg contains left marker, enable redact marker", + arg: "f‹oo", + redactInfoLogType: RedactInfoLogMarker, + expect: "‹f‹‹oo›", + }, + { + name: "string arg contains right marker, enable redact marker", + arg: "foo›", + redactInfoLogType: RedactInfoLogMarker, + expect: "‹foo›››", + }, + { + name: "string arg contains marker, enable redact marker", + arg: "f‹oo›", + redactInfoLogType: RedactInfoLogMarker, + expect: "‹f‹‹oo›››", + }, + { + name: "[]byte arg, enable redact marker", + arg: []byte("foo"), + redactInfoLogType: RedactInfoLogMarker, + expect: []byte("‹foo›"), }, { - name: "string arg", - arg: "foo", - enableRedactLog: false, - expect: "foo", + name: "[]byte arg contains left marker, enable redact marker", + arg: []byte("foo‹"), + redactInfoLogType: RedactInfoLogMarker, + expect: []byte("‹foo‹‹›"), }, { - name: "[]byte arg, enable redact", - arg: []byte("foo"), - enableRedactLog: true, - expect: []byte("?"), + name: "[]byte arg contains right marker, enable redact marker", + arg: []byte("›foo"), + redactInfoLogType: RedactInfoLogMarker, + expect: []byte("‹››foo›"), }, { - name: "[]byte arg", - arg: []byte("foo"), - enableRedactLog: false, - expect: []byte("foo"), + name: "[]byte arg contains marker, enable redact marker", + arg: []byte("f›o‹o"), + redactInfoLogType: RedactInfoLogMarker, + expect: []byte("‹f››o‹‹o›"), }, } for _, testCase := range testCases { - t.Log(testCase.name) - SetRedactLog(testCase.enableRedactLog) + setRedactType(testCase.redactInfoLogType) + // Create `fmt.Stringer`s to test `RedactStringer` later. + var argStringer, expectStringer = &strings.Builder{}, &strings.Builder{} switch r := testCase.arg.(type) { case []byte: - re.Equal(testCase.expect, RedactBytes(r)) + re.Equal(testCase.expect, RedactBytes(r), testCase.name) + argStringer.Write((testCase.arg).([]byte)) + expectStringer.Write((testCase.expect).([]byte)) case string: - re.Equal(testCase.expect, RedactString(r)) - case fmt.Stringer: - re.Equal(testCase.expect, RedactStringer(r)) + re.Equal(testCase.expect, RedactString(r), testCase.name) + argStringer.WriteString((testCase.arg).(string)) + expectStringer.WriteString((testCase.expect).(string)) default: - panic("unmatched case") + re.FailNow("unmatched case", testCase.name) } + re.Equal(expectStringer.String(), RedactStringer(argStringer).String(), testCase.name) } } diff --git a/pkg/utils/typeutil/conversion.go b/pkg/utils/typeutil/conversion.go index 128c7a887a4..dab12a52d9e 100644 --- a/pkg/utils/typeutil/conversion.go +++ b/pkg/utils/typeutil/conversion.go @@ -16,6 +16,7 @@ package typeutil import ( "encoding/binary" + "unsafe" "github.com/tikv/pd/pkg/errs" ) @@ -68,3 +69,19 @@ func JSONToUint64Slice(from any) ([]uint64, bool) { } return to, true } + +// BytesToString converts slice of bytes to string without copy. +func BytesToString(b []byte) string { + if len(b) == 0 { + return "" + } + return unsafe.String(unsafe.SliceData(b), len(b)) +} + +// StringToBytes converts string to slice of bytes without copy. +func StringToBytes(s string) []byte { + if len(s) == 0 { + return nil + } + return unsafe.Slice(unsafe.StringData(s), len(s)) +} diff --git a/pkg/utils/typeutil/conversion_test.go b/pkg/utils/typeutil/conversion_test.go index 7b17cfcbe2c..e69eeb57e23 100644 --- a/pkg/utils/typeutil/conversion_test.go +++ b/pkg/utils/typeutil/conversion_test.go @@ -73,3 +73,17 @@ func TestJSONToUint64Slice(t *testing.T) { re.False(ok) re.Nil(res) } + +func TestBytesToString(t *testing.T) { + re := require.New(t) + str := "hello" + b := []byte(str) + re.Equal(str, BytesToString(b)) +} + +func TestStringToBytes(t *testing.T) { + re := require.New(t) + str := "hello" + b := StringToBytes(str) + re.Equal([]byte(str), b) +} diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 1e26a97e12c..a37874a8461 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -259,7 +259,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R id = (uint64)(idFloat) if _, exists = handler.config.StoreIDWitRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } @@ -275,27 +275,27 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R err := handler.config.BuildWithArgs(args) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } err = handler.config.Persist() if err != nil { - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - _ = handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, nil) } func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() - _ = handler.rd.JSON(w, http.StatusOK, conf) + handler.rd.JSON(w, http.StatusOK, conf) } func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { - _ = handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } @@ -303,7 +303,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R defer handler.config.mu.Unlock() _, exists := handler.config.StoreIDWitRanges[id] if !exists { - _ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) return } delete(handler.config.StoreIDWitRanges, id) @@ -312,7 +312,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R handler.config.mu.Unlock() if err := handler.config.Persist(); err != nil { handler.config.mu.Lock() - _ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } handler.config.mu.Lock() @@ -321,7 +321,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R if len(handler.config.StoreIDWitRanges) == 0 { resp = noStoreInSchedulerInfo } - _ = handler.rd.JSON(w, http.StatusOK, resp) + handler.rd.JSON(w, http.StatusOK, resp) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/server/api/admin.go b/server/api/admin.go index dd81985b514..2184dc66aa6 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -150,30 +150,30 @@ func (h *adminHandler) SavePersistFile(w http.ResponseWriter, r *http.Request) { func (h *adminHandler) MarkSnapshotRecovering(w http.ResponseWriter, _ *http.Request) { if err := h.svr.MarkSnapshotRecovering(); err != nil { - _ = h.rd.Text(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) return } - _ = h.rd.Text(w, http.StatusOK, "") + h.rd.Text(w, http.StatusOK, "") } func (h *adminHandler) IsSnapshotRecovering(w http.ResponseWriter, r *http.Request) { marked, err := h.svr.IsSnapshotRecovering(r.Context()) if err != nil { - _ = h.rd.Text(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) return } type resStruct struct { Marked bool `json:"marked"` } - _ = h.rd.JSON(w, http.StatusOK, &resStruct{Marked: marked}) + h.rd.JSON(w, http.StatusOK, &resStruct{Marked: marked}) } func (h *adminHandler) UnmarkSnapshotRecovering(w http.ResponseWriter, r *http.Request) { if err := h.svr.UnmarkSnapshotRecovering(r.Context()); err != nil { - _ = h.rd.Text(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) return } - _ = h.rd.Text(w, http.StatusOK, "") + h.rd.Text(w, http.StatusOK, "") } // RecoverAllocID recover base alloc id @@ -185,34 +185,34 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) { } idValue, ok := input["id"].(string) if !ok || len(idValue) == 0 { - _ = h.rd.Text(w, http.StatusBadRequest, "invalid id value") + h.rd.Text(w, http.StatusBadRequest, "invalid id value") return } newID, err := strconv.ParseUint(idValue, 10, 64) if err != nil { - _ = h.rd.Text(w, http.StatusBadRequest, err.Error()) + h.rd.Text(w, http.StatusBadRequest, err.Error()) return } marked, err := h.svr.IsSnapshotRecovering(r.Context()) if err != nil { - _ = h.rd.Text(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) return } if !marked { - _ = h.rd.Text(w, http.StatusForbidden, "can only recover alloc id when recovering mark marked") + h.rd.Text(w, http.StatusForbidden, "can only recover alloc id when recovering mark marked") return } leader := h.svr.GetLeader() if leader == nil { - _ = h.rd.Text(w, http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error()) + h.rd.Text(w, http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error()) return } if err = h.svr.RecoverAllocID(r.Context(), newID); err != nil { - _ = h.rd.Text(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) } - _ = h.rd.Text(w, http.StatusOK, "") + h.rd.Text(w, http.StatusOK, "") } func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error { diff --git a/server/api/router.go b/server/api/router.go index 553332e96af..7aef165b267 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -388,7 +388,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { // Deprecated: use /pd/api/v1/ping instead. rootRouter.HandleFunc("/ping", func(http.ResponseWriter, *http.Request) {}).Methods(http.MethodGet) - rootRouter.Walk(func(route *mux.Route, _ *mux.Router, _ []*mux.Route) error { + _ = rootRouter.Walk(func(route *mux.Route, _ *mux.Router, _ []*mux.Route) error { serviceLabel := route.GetName() methods, _ := route.GetMethods() path, _ := route.GetPathTemplate() diff --git a/server/api/store.go b/server/api/store.go index 1d0da0e9825..4bf4af1496b 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errcode" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/response" @@ -305,6 +306,10 @@ func (h *storeHandler) SetStoreWeight(w http.ResponseWriter, r *http.Request) { // @Router /store/{id}/limit [post] func (h *storeHandler) SetStoreLimit(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) + if version := rc.GetScheduleConfig().StoreLimitVersion; version != storelimit.VersionV1 { + h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("current store limit version:%s not support set limit", version)) + return + } vars := mux.Vars(r) storeID, errParse := apiutil.ParseUint64VarsField(vars, "id") if errParse != nil { @@ -354,7 +359,9 @@ func (h *storeHandler) SetStoreLimit(w http.ResponseWriter, r *http.Request) { if typ == storelimit.RemovePeer { key = fmt.Sprintf("remove-peer-%v", storeID) } - h.handler.SetStoreLimitTTL(key, ratePerMin, time.Duration(ttl)*time.Second) + if err := h.handler.SetStoreLimitTTL(key, ratePerMin, time.Duration(ttl)*time.Second); err != nil { + log.Warn("failed to set store limit", errs.ZapError(err)) + } continue } if err := h.handler.SetStoreLimit(storeID, ratePerMin, typ); err != nil { @@ -405,6 +412,11 @@ func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request) // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /stores/limit [post] func (h *storesHandler) SetAllStoresLimit(w http.ResponseWriter, r *http.Request) { + cfg := h.GetScheduleConfig() + if version := cfg.StoreLimitVersion; version != storelimit.VersionV1 { + h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("current store limit version:%s not support get limit", version)) + return + } var input map[string]any if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return @@ -485,7 +497,12 @@ func (h *storesHandler) SetAllStoresLimit(w http.ResponseWriter, r *http.Request // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /stores/limit [get] func (h *storesHandler) GetAllStoresLimit(w http.ResponseWriter, r *http.Request) { - limits := h.GetScheduleConfig().StoreLimit + cfg := h.GetScheduleConfig() + if version := cfg.StoreLimitVersion; version != storelimit.VersionV1 { + h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("current store limit version:%s not support get limit", version)) + return + } + limits := cfg.StoreLimit includeTombstone := false var err error if includeStr := r.URL.Query().Get("include_tombstone"); includeStr != "" { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5c15856cec6..93be9d1c076 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1058,7 +1058,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - _ = ctx.MiscRunner.RunTask( + ctx.MiscRunner.RunTask( regionID, ratelimit.ObserveRegionStatsAsync, func() { @@ -1070,7 +1070,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } // region is not updated to the subtree. if origin.GetRef() < 2 { - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.UpdateSubTree, func() { @@ -1098,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() return err } - _ = ctx.TaskRunner.RunTask( + ctx.TaskRunner.RunTask( regionID, ratelimit.UpdateSubTree, func() { @@ -1109,7 +1109,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - _ = ctx.MiscRunner.RunTask( + ctx.MiscRunner.RunTask( regionID, ratelimit.HandleOverlaps, func() { @@ -1122,7 +1122,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - _ = ctx.MiscRunner.RunTask( + ctx.MiscRunner.RunTask( regionID, ratelimit.CollectRegionStatsAsync, func() { @@ -1136,7 +1136,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - _ = ctx.MiscRunner.RunTask( + ctx.MiscRunner.RunTask( regionID, ratelimit.SaveRegionToKV, func() { @@ -1590,6 +1590,19 @@ func (c *RaftCluster) setStore(store *core.StoreInfo) error { return nil } +func (c *RaftCluster) isStorePrepared() bool { + for _, store := range c.GetStores() { + if !store.IsPreparing() && !store.IsServing() { + continue + } + storeID := store.GetID() + if !c.IsStorePrepared(storeID) { + return false + } + } + return true +} + func (c *RaftCluster) checkStores() { var offlineStores []*metapb.Store var upStoreCount int @@ -1621,7 +1634,7 @@ func (c *RaftCluster) checkStores() { zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } - } else if c.IsPrepared() { + } else if c.IsPrepared() || (c.IsServiceIndependent(mcsutils.SchedulingServiceName) && c.isStorePrepared()) { threshold := c.getThreshold(stores, store) regionSize := float64(store.GetRegionSize()) log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) @@ -2175,7 +2188,9 @@ func (c *RaftCluster) runMinResolvedTSJob() { interval = c.opt.GetMinResolvedTSPersistenceInterval() if interval != 0 { if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist { - c.storage.SaveMinResolvedTS(current) + if err := c.storage.SaveMinResolvedTS(current); err != nil { + log.Error("persist min resolved ts meet error", errs.ZapError(err)) + } } } else { // If interval in config is zero, it means not to persist resolved ts and check config with this interval @@ -2252,8 +2267,7 @@ func (c *RaftCluster) SetExternalTS(timestamp uint64) error { c.Lock() defer c.Unlock() c.externalTS = timestamp - c.storage.SaveExternalTS(timestamp) - return nil + return c.storage.SaveExternalTS(timestamp) } // SetStoreLimit sets a store limit for a given type and rate. @@ -2289,8 +2303,8 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) } // SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl. -func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) { - c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl) +func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) error { + return c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl) } // GetClusterVersion returns the current cluster version. diff --git a/server/config/config.go b/server/config/config.go index 95d5dcb3257..81da5f6d9db 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -467,7 +467,9 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.MicroService.adjust(configMetaData.Child("micro-service")) - c.Security.Encryption.Adjust() + if err := c.Security.Encryption.Adjust(); err != nil { + return err + } c.Controller.Adjust(configMetaData.Child("controller")) @@ -579,7 +581,9 @@ func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error { } else if c.GCTunerThreshold > maxGCTunerThreshold { c.GCTunerThreshold = maxGCTunerThreshold } - c.migrateConfigurationFromFile(meta) + if err := c.migrateConfigurationFromFile(meta); err != nil { + return err + } return c.Validate() } diff --git a/server/config/config_test.go b/server/config/config_test.go index d7abfe0746a..df23241b787 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -30,12 +30,13 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/configutil" + "github.com/tikv/pd/pkg/utils/logutil" ) func TestSecurity(t *testing.T) { re := require.New(t) cfg := NewConfig() - re.False(cfg.Security.RedactInfoLog) + re.Equal(logutil.RedactInfoLogOFF, cfg.Security.RedactInfoLog) } func TestTLS(t *testing.T) { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 6f5dc50f205..d8a7d69f783 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -795,7 +795,9 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error { cfg := &persistedConfig{Config: &Config{}} // Pass nil to initialize cfg to default values (all items undefined) - cfg.Adjust(nil, true) + if err := cfg.Adjust(nil, true); err != nil { + return err + } isExist, err := storage.LoadConfig(cfg) if err != nil { diff --git a/server/handler.go b/server/handler.go index f5d9b9035b2..adcd429ee34 100644 --- a/server/handler.go +++ b/server/handler.go @@ -346,8 +346,7 @@ func (h *Handler) SetAllStoresLimitTTL(ratePerMin float64, limitType storelimit. if err != nil { return err } - c.SetAllStoresLimitTTL(limitType, ratePerMin, ttl) - return nil + return c.SetAllStoresLimitTTL(limitType, ratePerMin, ttl) } // SetLabelStoresLimit is used to set limit of label stores. diff --git a/server/server.go b/server/server.go index 1d38a5ee495..a272d7d5298 100644 --- a/server/server.go +++ b/server/server.go @@ -442,9 +442,15 @@ func (s *Server) startServer(ctx context.Context) error { s.rootPath = endpoint.PDRootPath(clusterID) s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath) - s.member.SetMemberDeployPath(s.member.ID()) - s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion) - s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) + if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil { + return err + } + if err := s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion); err != nil { + return err + } + if err := s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash); err != nil { + return err + } s.idAllocator = id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -1686,7 +1692,9 @@ func (s *Server) leaderLoop() { zap.String("current-leader-member-id", types.ID(etcdLeader).String()), zap.String("transferee-member-id", types.ID(s.member.ID()).String()), ) - s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()) + if err := s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()); err != nil { + log.Error("failed to move etcd leader", errs.ZapError(err)) + } } } log.Info("skip campaigning of pd leader and check later", diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 82da47d18f3..0eda33130ce 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -675,3 +675,72 @@ func (suite *multipleServerTestSuite) TestReElectLeader() { rc = suite.pdLeader.GetServer().GetRaftCluster() rc.IsPrepared() } + +func (suite *serverTestSuite) TestOnlineProgress() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + rc := suite.pdLeader.GetServer().GetRaftCluster() + re.NotNil(rc) + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + regionLen := 1000 + regions := tests.InitRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + re.NoError(err) + } + time.Sleep(2 * time.Second) + + // add a new store + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: 4, + Address: fmt.Sprintf("mock://%d", 4), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + + time.Sleep(2 * time.Second) + for i, r := range regions { + if i < 50 { + r.GetMeta().Peers[2].StoreId = 4 + r.GetMeta().RegionEpoch.ConfVer = 2 + r.GetMeta().RegionEpoch.Version = 2 + err = rc.HandleRegionHeartbeat(r) + re.NoError(err) + } + } + time.Sleep(2 * time.Second) + action, progress, ls, cs, err := rc.GetProgressByID("4") + re.Equal("preparing", action) + re.NotEmpty(progress) + re.NotEmpty(cs) + re.NotEmpty(ls) + re.NoError(err) + suite.TearDownSuite() + suite.SetupSuite() +} diff --git a/tests/registry/registry_test.go b/tests/registry/registry_test.go index 416a7420d2e..3551782d753 100644 --- a/tests/registry/registry_test.go +++ b/tests/registry/registry_test.go @@ -45,7 +45,7 @@ func (*testServiceRegistry) RegisterGRPCService(g *grpc.Server) { grpc_testing.RegisterTestServiceServer(g, &grpc_testing.UnimplementedTestServiceServer{}) } -func (*testServiceRegistry) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (*testServiceRegistry) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { group := apiutil.APIServiceGroup{ Name: "my-http-service", Version: "v1alpha1", @@ -56,7 +56,7 @@ func (*testServiceRegistry) RegisterRESTHandler(userDefineHandlers map[string]ht w.WriteHeader(http.StatusOK) w.Write([]byte("Hello World!")) }) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } func newTestServiceRegistry(_ bs.Server) registry.RegistrableService { diff --git a/tests/testutil.go b/tests/testutil.go index ea52bce310e..554def91f08 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -91,10 +91,10 @@ func SetRangePort(start, end int) { var once sync.Once // InitLogger initializes the logger for test. -func InitLogger(logConfig log.Config, logger *zap.Logger, logProps *log.ZapProperties, isRedactInfoLogEnabled bool) (err error) { +func InitLogger(logConfig log.Config, logger *zap.Logger, logProps *log.ZapProperties, redactInfoLog logutil.RedactInfoLogType) (err error) { once.Do(func() { // Setup the logger. - err = logutil.SetupLogger(logConfig, &logger, &logProps, isRedactInfoLogEnabled) + err = logutil.SetupLogger(logConfig, &logger, &logProps, redactInfoLog) if err != nil { return } @@ -438,6 +438,11 @@ func InitRegions(regionLen int) []*core.RegionInfo { {Id: allocator.alloc(), StoreId: uint64(3)}, }, } + if i == 0 { + r.StartKey = []byte{} + } else if i == regionLen-1 { + r.EndKey = []byte{} + } region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat)) // Here is used to simulate the upgrade process. if i < regionLen/2 { diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index f9feeeea580..d62d83437b6 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -92,7 +92,7 @@ func main() { default: log.Fatal("parse cmd flags error", zap.Error(err)) } - err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps) + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, logutil.RedactInfoLogOFF) if err == nil { log.ReplaceGlobals(cfg.Logger, cfg.LogProps) } else { @@ -199,9 +199,7 @@ func parseCaseNameAndConfig(str string) (string, *cases.Config) { strsb := strings.Split(strs[1], "+") cfg.QPS, err = strconv.ParseInt(strsb[0], 10, 64) if err != nil { - if err != nil { - log.Error("parse qps failed for case", zap.String("case", name), zap.String("config", strsb[0])) - } + log.Error("parse qps failed for case", zap.String("case", name), zap.String("config", strsb[0])) } // to get case Burst if len(strsb) > 1 { diff --git a/tools/pd-ctl/tests/store/store_test.go b/tools/pd-ctl/tests/store/store_test.go index afb97401168..b5cff2e8e5c 100644 --- a/tools/pd-ctl/tests/store/store_test.go +++ b/tools/pd-ctl/tests/store/store_test.go @@ -38,6 +38,38 @@ import ( "go.etcd.io/etcd/pkg/transport" ) +func TestStoreLimitV2(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := pdTests.NewTestCluster(ctx, 1) + re.NoError(err) + err = cluster.RunInitialServers() + re.NoError(err) + cluster.WaitLeader() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + leaderServer := cluster.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) + defer cluster.Destroy() + + // store command + args := []string{"-u", pdAddr, "config", "set", "store-limit-version", "v2"} + _, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + + args = []string{"-u", pdAddr, "store", "limit"} + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "not support get limit") + + args = []string{"-u", pdAddr, "store", "limit", "1", "10"} + output, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "not support set limit") +} + func TestStore(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index ec5e2506e6b..77ae6354bff 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -475,7 +475,7 @@ func main() { } // New zap logger - err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps) + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, logutil.RedactInfoLogOFF) if err == nil { log.ReplaceGlobals(cfg.Logger, cfg.LogProps) } else { diff --git a/tools/pd-simulator/README.md b/tools/pd-simulator/README.md index c47024fc24b..107f6c40f64 100644 --- a/tools/pd-simulator/README.md +++ b/tools/pd-simulator/README.md @@ -43,3 +43,14 @@ Run a specific case with an external PD: ```shell ./pd-simulator -pd="http://127.0.0.1:2379" -case="casename" ``` + +Run with tiup playgroudn : +```shell +tiup playground nightly --host 127.0.0.1 --kv.binpath ./pd-simulator --kv=1 --db=0 --kv.config=./tikv.conf +``` +tikv conf +``` +case-name="redundant-balance-region" +sim-tick-interval="1s" +store-io-per-second=100 +``` \ No newline at end of file diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 934cb7fd54f..63ba7f9134d 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -111,7 +111,7 @@ func run(simCase string, simConfig *sc.SimConfig) { // NewSingleServer creates a pd server for simulator. func NewSingleServer(ctx context.Context, simConfig *sc.SimConfig) (*server.Server, testutil.CleanupFunc) { - err := logutil.SetupLogger(simConfig.ServerConfig.Log, &simConfig.ServerConfig.Logger, &simConfig.ServerConfig.LogProps) + err := logutil.SetupLogger(simConfig.ServerConfig.Log, &simConfig.ServerConfig.Logger, &simConfig.ServerConfig.LogProps, simConfig.ServerConfig.Security.RedactInfoLog) if err == nil { log.ReplaceGlobals(simConfig.ServerConfig.Logger, simConfig.ServerConfig.LogProps) } else { diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index 238b54c935a..0ddd66608b1 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -89,6 +89,7 @@ var IDAllocator idAllocator var CaseMap = map[string]func(*config.SimConfig) *Case{ "balance-leader": newBalanceLeader, "redundant-balance-region": newRedundantBalanceRegion, + "scale-in-out": newScaleInOut, "region-split": newRegionSplit, "region-merge": newRegionMerge, "hot-read": newHotRead, diff --git a/tools/pd-simulator/simulator/cases/scale_tikv.go b/tools/pd-simulator/simulator/cases/scale_tikv.go new file mode 100644 index 00000000000..96d44513ae7 --- /dev/null +++ b/tools/pd-simulator/simulator/cases/scale_tikv.go @@ -0,0 +1,83 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" + "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" +) + +func newScaleInOut(config *sc.SimConfig) *Case { + var simCase Case + + totalStore := config.TotalStore + totalRegion := config.TotalRegion + replica := int(config.ServerConfig.Replication.MaxReplicas) + if totalStore == 0 || totalRegion == 0 { + totalStore, totalRegion = 6, 4000 + } + + for i := 0; i < totalStore; i++ { + s := &Store{ + ID: IDAllocator.nextID(), + Status: metapb.StoreState_Up, + } + if i%2 == 1 { + s.HasExtraUsedSpace = true + } + simCase.Stores = append(simCase.Stores, s) + } + + for i := 0; i < totalRegion; i++ { + peers := make([]*metapb.Peer, 0, replica) + for j := 0; j < replica; j++ { + peers = append(peers, &metapb.Peer{ + Id: simutil.IDAllocator.NextID(), + StoreId: uint64((i+j)%totalStore + 1), + }) + } + simCase.Regions = append(simCase.Regions, Region{ + ID: IDAllocator.nextID(), + Peers: peers, + Leader: peers[0], + }) + } + + scaleInTick := int64(totalRegion * 3 / totalStore) + addEvent := &AddNodesDescriptor{} + addEvent.Step = func(tick int64) uint64 { + if tick == scaleInTick { + return uint64(totalStore + 1) + } + return 0 + } + + removeEvent := &DeleteNodesDescriptor{} + removeEvent.Step = func(tick int64) uint64 { + if tick == scaleInTick*2 { + return uint64(totalStore + 1) + } + return 0 + } + simCase.Events = []EventDescriptor{addEvent, removeEvent} + + simCase.Checker = func([]*metapb.Store, *core.RegionsInfo, []info.StoreStats) bool { + return false + } + return &simCase +}