diff --git a/.golangci.yml b/.golangci.yml index d4ce8edb65e..c70c0996cb3 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -207,6 +207,6 @@ issues: - path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump) linters: - errcheck - - path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/syncer/server.go) + - path: (pkg/schedule/schedulers/split_bucket.go|server/api/.*\.go|pkg/schedule/schedulers/balance_leader.go) linters: - errcheck diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index bd75309ed8a..2de5b702253 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -196,8 +196,12 @@ func start(cmd *cobra.Command, args []string, services ...string) { schedulers.Register() cfg := config.NewConfig() flagSet := cmd.Flags() - flagSet.Parse(args) - err := cfg.Parse(flagSet) + err := flagSet.Parse(args) + if err != nil { + cmd.Println(err) + return + } + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -231,7 +235,9 @@ func start(cmd *cobra.Command, args []string, services ...string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() + defer func() { + _ = log.Sync() + }() memory.InitMemoryHook() if len(services) != 0 { versioninfo.Log(server.APIServiceMode) @@ -295,6 +301,6 @@ func start(cmd *cobra.Command, args []string, services ...string) { } func exit(code int) { - log.Sync() + _ = log.Sync() os.Exit(code) } diff --git a/pkg/mcs/metastorage/server/grpc_service.go b/pkg/mcs/metastorage/server/grpc_service.go index f018dc72f9f..7969173f11f 100644 --- a/pkg/mcs/metastorage/server/grpc_service.go +++ b/pkg/mcs/metastorage/server/grpc_service.go @@ -47,7 +47,7 @@ type dummyRestService struct{} func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) - w.Write([]byte("not implemented")) + _, _ = w.Write([]byte("not implemented")) } // Service is the gRPC service for meta storage. @@ -71,9 +71,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } func (s *Service) checkServing() error { diff --git a/pkg/mcs/registry/registry.go b/pkg/mcs/registry/registry.go index 02e49f63d75..c6470b34dc4 100644 --- a/pkg/mcs/registry/registry.go +++ b/pkg/mcs/registry/registry.go @@ -37,7 +37,7 @@ type ServiceBuilder func(bs.Server) RegistrableService // RegistrableService is the interface that should wraps the RegisterService method. type RegistrableService interface { RegisterGRPCService(g *grpc.Server) - RegisterRESTHandler(userDefineHandlers map[string]http.Handler) + RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error } // ServiceRegistry is a map that stores all registered grpc services. @@ -82,14 +82,20 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http for name, builder := range r.builders { serviceName := createServiceName(prefix, name) if l, ok := r.services[serviceName]; ok { - l.RegisterRESTHandler(h) - log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) + if err := l.RegisterRESTHandler(h); err != nil { + log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + } else { + log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) + } continue } l := builder(srv) r.services[serviceName] = l - l.RegisterRESTHandler(h) - log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) + if err := l.RegisterRESTHandler(h); err != nil { + log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + } else { + log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) + } } } diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 70862ffb89c..83197b70f2e 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -237,7 +237,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error { } c.adjustLog(configMetaData.Child("log")) - c.Security.Encryption.Adjust() + if err := c.Security.Encryption.Adjust(); err != nil { + return err + } c.Controller.Adjust(configMetaData.Child("controller")) configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index 2f35042c48f..d503893061d 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -49,7 +49,7 @@ type dummyRestService struct{} func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) - w.Write([]byte("not implemented")) + _, _ = w.Write([]byte("not implemented")) } // Service is the gRPC service for resource manager. @@ -76,9 +76,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } // GetManager returns the resource manager. @@ -228,6 +228,8 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu log.Debug("finish token request from", zap.String("resource-group", resourceGroupName)) resps.Responses = append(resps.Responses, resp) } - stream.Send(resps) + if err := stream.Send(resps); err != nil { + return errors.WithStack(err) + } } } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 418d188823f..13e46ea0bba 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -349,7 +349,9 @@ func (m *Manager) persistResourceGroupRunningState() { m.RLock() group, ok := m.groups[keys[idx]] if ok { - group.persistStates(m.storage) + if err := group.persistStates(m.storage); err != nil { + log.Error("persist resource group state failed", zap.Error(err)) + } } m.RUnlock() } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 1fac592f791..06e1b61d757 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -180,7 +180,9 @@ func (s *Server) campaignLeader() { log.Info("triggering the primary callback functions") for _, cb := range s.primaryCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to trigger the primary callback function", errs.ZapError(err)) + } } s.participant.EnableLeader() @@ -213,7 +215,9 @@ func (s *Server) Close() { } log.Info("closing resource manager server ...") - s.serviceRegister.Deregister() + if err := s.serviceRegister.Deregister(); err != nil { + log.Error("failed to deregister the service", errs.ZapError(err)) + } utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() @@ -362,10 +366,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { - cmd.Flags().Parse(args) + err := cmd.Flags().Parse(args) + if err != nil { + cmd.Println(err) + return + } cfg := NewConfig() flagSet := cmd.Flags() - err := cfg.Parse(flagSet) + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -389,7 +397,9 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() + defer func() { + _ = log.Sync() + }() versioninfo.Log(serviceName) log.Info("resource manager config", zap.Reflect("config", cfg)) diff --git a/pkg/mcs/resourcemanager/server/testutil.go b/pkg/mcs/resourcemanager/server/testutil.go index 3de0e32c0ab..9b96def27ff 100644 --- a/pkg/mcs/resourcemanager/server/testutil.go +++ b/pkg/mcs/resourcemanager/server/testutil.go @@ -32,7 +32,9 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*S re.NoError(err) log.ReplaceGlobals(cfg.Logger, cfg.LogProps) // Flushing any buffered log entries - defer log.Sync() + defer func() { + _ = log.Sync() + }() s := CreateServer(ctx, cfg) if err = s.Run(); err != nil { diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 9dc6590a0b4..2111aa3ddcc 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -146,7 +146,9 @@ func (c *Config) adjust(meta *toml.MetaData) error { } c.adjustLog(configMetaData.Child("log")) - c.Security.Encryption.Adjust() + if err := c.Security.Encryption.Adjust(); err != nil { + return err + } configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 842e876885c..93af46356b7 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -53,7 +53,7 @@ type dummyRestService struct{} func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) - w.Write([]byte("not implemented")) + _, _ = w.Write([]byte("not implemented")) } // ConfigProvider is used to get scheduling config from the given @@ -336,9 +336,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader { diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 47a7cf9962b..fa129cb54ec 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -316,7 +316,9 @@ func (s *Server) Close() { } log.Info("closing scheduling server ...") - s.serviceRegister.Deregister() + if err := s.serviceRegister.Deregister(); err != nil { + log.Error("failed to deregister the service", errs.ZapError(err)) + } utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() @@ -563,10 +565,14 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { schedulers.Register() - cmd.Flags().Parse(args) + err := cmd.Flags().Parse(args) + if err != nil { + cmd.Println(err) + return + } cfg := config.NewConfig() flagSet := cmd.Flags() - err := cfg.Parse(flagSet) + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -590,7 +596,9 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() + defer func() { + _ = log.Sync() + }() versioninfo.Log(serviceName) log.Info("scheduling service config", zap.Reflect("config", cfg)) diff --git a/pkg/mcs/scheduling/server/testutil.go b/pkg/mcs/scheduling/server/testutil.go index 74baac44808..a7d6ae0e0a0 100644 --- a/pkg/mcs/scheduling/server/testutil.go +++ b/pkg/mcs/scheduling/server/testutil.go @@ -33,7 +33,9 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf re.NoError(err) log.ReplaceGlobals(cfg.Logger, cfg.LogProps) // Flushing any buffered log entries - defer log.Sync() + defer func() { + _ = log.Sync() + }() s := CreateServer(ctx, cfg) if err = s.Run(); err != nil { diff --git a/pkg/mcs/tso/server/config.go b/pkg/mcs/tso/server/config.go index 06e9054e117..8cfef98ebaf 100644 --- a/pkg/mcs/tso/server/config.go +++ b/pkg/mcs/tso/server/config.go @@ -224,9 +224,7 @@ func (c *Config) Adjust(meta *toml.MetaData) error { } c.adjustLog(configMetaData.Child("log")) - c.Security.Encryption.Adjust() - - return nil + return c.Security.Encryption.Adjust() } func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 0a075a45c05..a8abb0bc8fd 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -49,7 +49,7 @@ type dummyRestService struct{} func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) - w.Write([]byte("not implemented")) + _, _ = w.Write([]byte("not implemented")) } // ConfigProvider is used to get tso config from the given @@ -78,9 +78,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) { } // RegisterRESTHandler registers the service to REST server. -func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { handler, group := SetUpRestHandler(s) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } // Tso returns a stream of timestamps diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c38c7142730..558fde8f474 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -174,7 +174,9 @@ func (s *Server) Close() { log.Info("closing tso server ...") // close tso service loops in the keyspace group manager s.keyspaceGroupManager.Close() - s.serviceRegister.Deregister() + if err := s.serviceRegister.Deregister(); err != nil { + log.Error("failed to deregister the service", errs.ZapError(err)) + } utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() @@ -435,10 +437,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { - cmd.Flags().Parse(args) + err := cmd.Flags().Parse(args) + if err != nil { + cmd.Println(err) + return + } cfg := NewConfig() flagSet := cmd.Flags() - err := cfg.Parse(flagSet) + err = cfg.Parse(flagSet) defer logutil.LogPanic() if err != nil { @@ -462,7 +468,9 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { log.Fatal("initialize logger error", errs.ZapError(err)) } // Flushing any buffered log entries - defer log.Sync() + defer func() { + _ = log.Sync() + }() versioninfo.Log(serviceName) log.Info("TSO service config", zap.Reflect("config", cfg)) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 5f6b212529b..fd55874ce30 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -468,31 +468,31 @@ func (m *ModeManager) tickUpdateState() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(storeIDs[primaryUp]) + _ = m.drSwitchToAsyncWait(storeIDs[primaryUp]) } case drStateAsyncWait: if canSync { - m.drSwitchToSync() + _ = m.drSwitchToSync() break } if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) { - m.drSwitchToAsyncWait(storeIDs[primaryUp]) + _ = m.drSwitchToAsyncWait(storeIDs[primaryUp]) break } if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { - m.drSwitchToAsync(storeIDs[primaryUp]) + _ = m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration { - m.drSwitchToSyncRecover() + _ = m.drSwitchToSyncRecover() break } if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { - m.drSwitchToAsync(storeIDs[primaryUp]) + _ = m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(storeIDs[primaryUp]) + _ = m.drSwitchToAsync(storeIDs[primaryUp]) } else { m.updateProgress() progress := m.estimateProgress() @@ -500,7 +500,7 @@ func (m *ModeManager) tickUpdateState() { drRecoverProgressGauge.Set(float64(progress)) if progress == 1.0 { - m.drSwitchToSync() + _ = m.drSwitchToSync() } else { m.updateRecoverProgress(progress) } diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index d19a4f70d66..bbaada98924 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -87,8 +87,7 @@ func TestStatus(t *testing.T) { }, }, rep.GetReplicationStatus()) - err = rep.drSwitchToAsync(nil) - re.NoError(err) + re.NoError(rep.drSwitchToAsync(nil)) re.Equal(&pb.ReplicationStatus{ Mode: pb.ReplicationMode_DR_AUTO_SYNC, DrAutoSync: &pb.DRAutoSync{ @@ -99,8 +98,7 @@ func TestStatus(t *testing.T) { }, }, rep.GetReplicationStatus()) - err = rep.drSwitchToSyncRecover() - re.NoError(err) + re.NoError(rep.drSwitchToSyncRecover()) stateID := rep.drAutoSync.StateID re.Equal(&pb.ReplicationStatus{ Mode: pb.ReplicationMode_DR_AUTO_SYNC, @@ -327,7 +325,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() - rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) + re.NoError(rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5})) rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour) rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout @@ -346,14 +344,14 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) assertStateIDUpdate() setStoreState(cluster, "down", "down", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) // sync_recover -> sync - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) assertStateIDUpdate() setStoreState(cluster, "up", "up", "up", "up", "up", "up") cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5) @@ -500,7 +498,7 @@ func TestRecoverProgress(t *testing.T) { re.NoError(err) prepare := func(n int, asyncRegions []int) { - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) regions := genRegions(cluster, rep.drAutoSync.StateID, n) for _, i := range asyncRegions { regions[i] = regions[i].Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ @@ -565,7 +563,7 @@ func TestRecoverProgressWithSplitAndMerge(t *testing.T) { re.NoError(err) prepare := func(n int, asyncRegions []int) { - rep.drSwitchToSyncRecover() + re.NoError(rep.drSwitchToSyncRecover()) regions := genRegions(cluster, rep.drAutoSync.StateID, n) for _, i := range asyncRegions { regions[i] = regions[i].Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index c2f09980651..85b29e0b47e 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -100,7 +100,9 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf } if ssp.ExpiredAt < now.Unix() { - se.Remove(key) + if err := se.Remove(key); err != nil { + log.Error("failed to remove expired service safepoint", errs.ZapError(err)) + } continue } if ssp.SafePoint < min.SafePoint { diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index ccc32b13303..132b06aec69 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -282,7 +282,10 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync RegionLeaders: leaders, Buckets: buckets, } - s.limit.WaitN(ctx, resp.Size()) + if err := s.limit.WaitN(ctx, resp.Size()); err != nil { + log.Error("failed to wait rate limit", errs.ZapError(err)) + return err + } lastIndex += len(metas) if err := stream.Send(resp); err != nil { log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) diff --git a/pkg/tso/admin.go b/pkg/tso/admin.go index bc9fd1f853d..6626d1d2714 100644 --- a/pkg/tso/admin.go +++ b/pkg/tso/admin.go @@ -73,12 +73,12 @@ func (h *AdminHandler) ResetTS(w http.ResponseWriter, r *http.Request) { } tsValue, ok := input["tso"].(string) if !ok || len(tsValue) == 0 { - h.rd.JSON(w, http.StatusBadRequest, "invalid tso value") + _ = h.rd.JSON(w, http.StatusBadRequest, "invalid tso value") return } ts, err := strconv.ParseUint(tsValue, 10, 64) if err != nil { - h.rd.JSON(w, http.StatusBadRequest, "invalid tso value") + _ = h.rd.JSON(w, http.StatusBadRequest, "invalid tso value") return } @@ -86,7 +86,7 @@ func (h *AdminHandler) ResetTS(w http.ResponseWriter, r *http.Request) { forceUseLargerVal, contains := input["force-use-larger"] if contains { if forceUseLarger, ok = forceUseLargerVal.(bool); !ok { - h.rd.JSON(w, http.StatusBadRequest, "invalid force-use-larger value") + _ = h.rd.JSON(w, http.StatusBadRequest, "invalid force-use-larger value") return } } @@ -97,17 +97,17 @@ func (h *AdminHandler) ResetTS(w http.ResponseWriter, r *http.Request) { if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0); err != nil { if err == errs.ErrServerNotStarted { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + _ = h.rd.JSON(w, http.StatusInternalServerError, err.Error()) } else if err == errs.ErrEtcdTxnConflict { // If the error is ErrEtcdTxnConflict, it means there is a temporary failure. // Return 503 to let the client retry. // Ref: https://datatracker.ietf.org/doc/html/rfc7231#section-6.6.4 - h.rd.JSON(w, http.StatusServiceUnavailable, + _ = h.rd.JSON(w, http.StatusServiceUnavailable, fmt.Sprintf("It's a temporary failure with error %s, please retry.", err.Error())) } else { - h.rd.JSON(w, http.StatusForbidden, err.Error()) + _ = h.rd.JSON(w, http.StatusForbidden, err.Error()) } return } - h.rd.JSON(w, http.StatusOK, "Reset ts successfully.") + _ = h.rd.JSON(w, http.StatusOK, "Reset ts successfully.") } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5c15856cec6..05925eb1720 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2175,7 +2175,9 @@ func (c *RaftCluster) runMinResolvedTSJob() { interval = c.opt.GetMinResolvedTSPersistenceInterval() if interval != 0 { if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist { - c.storage.SaveMinResolvedTS(current) + if err := c.storage.SaveMinResolvedTS(current); err != nil { + log.Error("persist min resolved ts meet error", errs.ZapError(err)) + } } } else { // If interval in config is zero, it means not to persist resolved ts and check config with this interval @@ -2252,8 +2254,7 @@ func (c *RaftCluster) SetExternalTS(timestamp uint64) error { c.Lock() defer c.Unlock() c.externalTS = timestamp - c.storage.SaveExternalTS(timestamp) - return nil + return c.storage.SaveExternalTS(timestamp) } // SetStoreLimit sets a store limit for a given type and rate. @@ -2289,8 +2290,8 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) } // SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl. -func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) { - c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl) +func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) error { + return c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl) } // GetClusterVersion returns the current cluster version. diff --git a/server/config/config.go b/server/config/config.go index 95d5dcb3257..81da5f6d9db 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -467,7 +467,9 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.MicroService.adjust(configMetaData.Child("micro-service")) - c.Security.Encryption.Adjust() + if err := c.Security.Encryption.Adjust(); err != nil { + return err + } c.Controller.Adjust(configMetaData.Child("controller")) @@ -579,7 +581,9 @@ func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error { } else if c.GCTunerThreshold > maxGCTunerThreshold { c.GCTunerThreshold = maxGCTunerThreshold } - c.migrateConfigurationFromFile(meta) + if err := c.migrateConfigurationFromFile(meta); err != nil { + return err + } return c.Validate() } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 6f5dc50f205..d8a7d69f783 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -795,7 +795,9 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error { cfg := &persistedConfig{Config: &Config{}} // Pass nil to initialize cfg to default values (all items undefined) - cfg.Adjust(nil, true) + if err := cfg.Adjust(nil, true); err != nil { + return err + } isExist, err := storage.LoadConfig(cfg) if err != nil { diff --git a/server/handler.go b/server/handler.go index f5d9b9035b2..adcd429ee34 100644 --- a/server/handler.go +++ b/server/handler.go @@ -346,8 +346,7 @@ func (h *Handler) SetAllStoresLimitTTL(ratePerMin float64, limitType storelimit. if err != nil { return err } - c.SetAllStoresLimitTTL(limitType, ratePerMin, ttl) - return nil + return c.SetAllStoresLimitTTL(limitType, ratePerMin, ttl) } // SetLabelStoresLimit is used to set limit of label stores. diff --git a/server/server.go b/server/server.go index 1d38a5ee495..a272d7d5298 100644 --- a/server/server.go +++ b/server/server.go @@ -442,9 +442,15 @@ func (s *Server) startServer(ctx context.Context) error { s.rootPath = endpoint.PDRootPath(clusterID) s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath) - s.member.SetMemberDeployPath(s.member.ID()) - s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion) - s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) + if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil { + return err + } + if err := s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion); err != nil { + return err + } + if err := s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash); err != nil { + return err + } s.idAllocator = id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -1686,7 +1692,9 @@ func (s *Server) leaderLoop() { zap.String("current-leader-member-id", types.ID(etcdLeader).String()), zap.String("transferee-member-id", types.ID(s.member.ID()).String()), ) - s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()) + if err := s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()); err != nil { + log.Error("failed to move etcd leader", errs.ZapError(err)) + } } } log.Info("skip campaigning of pd leader and check later", diff --git a/tests/registry/registry_test.go b/tests/registry/registry_test.go index 416a7420d2e..3551782d753 100644 --- a/tests/registry/registry_test.go +++ b/tests/registry/registry_test.go @@ -45,7 +45,7 @@ func (*testServiceRegistry) RegisterGRPCService(g *grpc.Server) { grpc_testing.RegisterTestServiceServer(g, &grpc_testing.UnimplementedTestServiceServer{}) } -func (*testServiceRegistry) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { +func (*testServiceRegistry) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error { group := apiutil.APIServiceGroup{ Name: "my-http-service", Version: "v1alpha1", @@ -56,7 +56,7 @@ func (*testServiceRegistry) RegisterRESTHandler(userDefineHandlers map[string]ht w.WriteHeader(http.StatusOK) w.Write([]byte("Hello World!")) }) - apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) + return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } func newTestServiceRegistry(_ bs.Server) registry.RegistrableService {