From 849d80d79f423984a05f1524f746b6aa463b158d Mon Sep 17 00:00:00 2001 From: ShuNing Date: Thu, 28 Sep 2023 15:39:20 +0800 Subject: [PATCH 1/5] api: supports GetRegion by hex key (#7160) close tikv/pd#7159 api: supports GetRegion by hex key Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/api/region.go | 11 +++++++++++ server/api/region_test.go | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/server/api/region.go b/server/api/region.go index 42b430974c4..68e280f610c 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -277,6 +277,17 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } + // decode hex if query has params with hex format + formatStr := r.URL.Query().Get("format") + if formatStr == "hex" { + keyBytes, err := hex.DecodeString(key) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + key = string(keyBytes) + } + regionInfo := rc.GetRegionByKey([]byte(key)) h.rd.JSON(w, http.StatusOK, NewAPIRegionInfo(regionInfo)) } diff --git a/server/api/region_test.go b/server/api/region_test.go index acd305884d4..a39a1e5c5fd 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -147,6 +147,12 @@ func (suite *regionTestSuite) TestRegion() { suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) r2.Adjust() suite.Equal(NewAPIRegionInfo(r), r2) + + url = fmt.Sprintf("%s/region/key/%s?format=hex", suite.urlPrefix, hex.EncodeToString([]byte("a"))) + r2 = &RegionInfo{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) + r2.Adjust() + suite.Equal(NewAPIRegionInfo(r), r2) } func (suite *regionTestSuite) TestRegionCheck() { From 3904a62661cdb8fe95b083c08ff4d69f55e01dc1 Mon Sep 17 00:00:00 2001 From: Hu# Date: Thu, 28 Sep 2023 17:19:51 +0800 Subject: [PATCH 2/5] dashboard_test: extend wait time for dashboard service (#7170) close tikv/pd#3182 Signed-off-by: husharp --- server/config/config.go | 2 +- tests/dashboard/service_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 5b9088ac8ea..0485e077c67 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -505,7 +505,7 @@ type PDServerConfig struct { // RuntimeServices is the running extension services. RuntimeServices typeutil.StringSlice `toml:"runtime-services" json:"runtime-services"` // MetricStorage is the cluster metric storage. - // Currently we use prometheus as metric storage, we may use PD/TiKV as metric storage later. + // Currently, we use prometheus as metric storage, we may use PD/TiKV as metric storage later. MetricStorage string `toml:"metric-storage" json:"metric-storage"` // There are some values supported: "auto", "none", or a specific address, default: "auto" DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"` diff --git a/tests/dashboard/service_test.go b/tests/dashboard/service_test.go index ab3a2c431cb..5f72efb2c36 100644 --- a/tests/dashboard/service_test.go +++ b/tests/dashboard/service_test.go @@ -86,7 +86,8 @@ func (suite *dashboardTestSuite) checkRespCode(url string, code int) { } func waitForConfigSync() { - time.Sleep(time.Second) + // Need to wait dashboard service start. + time.Sleep(3 * time.Second) } func (suite *dashboardTestSuite) checkServiceIsStarted(internalProxy bool, servers map[string]*tests.TestServer, leader *tests.TestServer) string { From 54219d649fb4c8834cd94362a63988f3c074d33e Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Thu, 28 Sep 2023 17:42:21 +0800 Subject: [PATCH 3/5] region: fix the potential panic . (#7143) close tikv/pd#4399 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 20 ++++++++++++++++++-- pkg/core/region_test.go | 13 +++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 9e32bf7c2f5..8d0379f266f 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -23,11 +23,13 @@ import ( "sort" "strings" "sync/atomic" + "time" "unsafe" "github.com/docker/go-units" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" @@ -996,6 +998,11 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol // UpdateSubTree updates the subtree. func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*RegionInfo, rangeChanged bool) { + failpoint.Inject("UpdateSubTree", func() { + if origin == nil { + time.Sleep(time.Second) + } + }) r.st.Lock() defer r.st.Unlock() if origin != nil { @@ -1004,8 +1011,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi // TODO: Improve performance by deleting only the different peers. r.removeRegionFromSubTreeLocked(origin) } else { - r.updateSubTreeStat(origin, region) - r.subRegions[region.GetID()].RegionInfo = region + // The region tree and the subtree update is not atomic and the region tree is updated first. + // If there are two thread needs to update region tree, + // t1: thread-A update region tree + // t2: thread-B: update region tree again + // t3: thread-B: update subtree + // t4: thread-A: update region subtree + // to keep region tree consistent with subtree, we need to drop this update. + if tree, ok := r.subRegions[region.GetID()]; ok { + r.updateSubTreeStat(origin, region) + tree.RegionInfo = region + } return } } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 5588d9190ec..50302de920e 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -21,6 +21,7 @@ import ( "strconv" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" @@ -450,6 +451,18 @@ func TestRegionKey(t *testing.T) { } } +func TestSetRegionConcurrence(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/core/UpdateSubTree", `return()`)) + regions := NewRegionsInfo() + region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b")) + go func() { + regions.AtomicCheckAndPutRegion(region) + }() + regions.AtomicCheckAndPutRegion(region) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree")) +} + func TestSetRegion(t *testing.T) { re := require.New(t) regions := NewRegionsInfo() From 95560408ad1c3db206559a826dadaf6cc47a584a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 8 Oct 2023 11:34:52 +0800 Subject: [PATCH 4/5] mcs: support changing log level (#7172) ref tikv/pd#5839 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resourcemanager/server/apis/v1/api.go | 31 +++++++++++++++++-- .../resourcemanager/server/grpc_service.go | 3 +- pkg/mcs/resourcemanager/server/server.go | 12 +++++++ pkg/mcs/scheduling/server/apis/v1/api.go | 25 +++++++++++++++ pkg/mcs/scheduling/server/server.go | 12 +++++++ pkg/mcs/tso/server/apis/v1/api.go | 18 +++++++++++ pkg/mcs/tso/server/server.go | 12 +++++++ pkg/utils/logutil/log.go | 10 ++++++ server/server.go | 11 +------ 9 files changed, 120 insertions(+), 14 deletions(-) diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 970880788d4..ffcb9318590 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -27,10 +27,12 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/reflectutil" ) @@ -57,7 +59,7 @@ func init() { // Service is the resource group service. type Service struct { apiHandlerEngine *gin.Engine - baseEndpoint *gin.RouterGroup + root *gin.RouterGroup manager *rmserver.Manager } @@ -86,15 +88,22 @@ func NewService(srv *rmserver.Service) *Service { s := &Service{ manager: manager, apiHandlerEngine: apiHandlerEngine, - baseEndpoint: endpoint, + root: endpoint, } + s.RegisterAdminRouter() s.RegisterRouter() return s } +// RegisterAdminRouter registers the router of the TSO admin handler. +func (s *Service) RegisterAdminRouter() { + router := s.root.Group("admin") + router.PUT("/log", changeLogLevel) +} + // RegisterRouter registers the router of the service. func (s *Service) RegisterRouter() { - configEndpoint := s.baseEndpoint.Group("/config") + configEndpoint := s.root.Group("/config") configEndpoint.POST("/group", s.postResourceGroup) configEndpoint.PUT("/group", s.putResourceGroup) configEndpoint.GET("/group/:name", s.getResourceGroup) @@ -110,6 +119,22 @@ func (s *Service) handler() http.Handler { }) } +func changeLogLevel(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*rmserver.Service) + var level string + if err := c.Bind(&level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err := svr.SetLogLevel(level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + c.String(http.StatusOK, "The log level is updated.") +} + // postResourceGroup // // @Tags ResourceManager diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index d0fac920f2f..bd46c45fa63 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -54,7 +54,8 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Service is the gRPC service for resource manager. type Service struct { - ctx context.Context + ctx context.Context + *Server manager *Manager // settings } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 78685850e86..9b9bd91c6eb 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -26,6 +26,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/resource_manager" @@ -86,6 +87,17 @@ func (s *Server) GetAddr() string { return s.cfg.ListenAddr } +// SetLogLevel sets log level. +func (s *Server) SetLogLevel(level string) error { + if !logutil.IsLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } + s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + // Run runs the Resource Manager server. func (s *Server) Run() (err error) { skipWaitAPIServiceReady := false diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index e66bf00ef94..b34e79b9ea8 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -25,6 +25,7 @@ import ( "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/joho/godotenv" + "github.com/pingcap/log" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule" @@ -33,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/unrolled/render" ) @@ -110,12 +112,19 @@ func NewService(srv *scheserver.Service) *Service { root: root, rd: createIndentRender(), } + s.RegisterAdminRouter() s.RegisterOperatorsRouter() s.RegisterSchedulersRouter() s.RegisterCheckersRouter() return s } +// RegisterAdminRouter registers the router of the admin handler. +func (s *Service) RegisterAdminRouter() { + router := s.root.Group("admin") + router.PUT("/log", changeLogLevel) +} + // RegisterSchedulersRouter registers the router of the schedulers handler. func (s *Service) RegisterSchedulersRouter() { router := s.root.Group("schedulers") @@ -138,6 +147,22 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +func changeLogLevel(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + var level string + if err := c.Bind(&level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err := svr.SetLogLevel(level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + c.String(http.StatusOK, "The log level is updated.") +} + // @Tags operators // @Summary Get an operator by ID. // @Param region_id path int true "A Region's Id" diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index c1aecc2f18b..4ec2f2731e7 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -27,6 +27,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -119,6 +120,17 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } +// SetLogLevel sets log level. +func (s *Server) SetLogLevel(level string) error { + if !logutil.IsLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } + s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + // Run runs the scheduling server. func (s *Server) Run() error { skipWaitAPIServiceReady := false diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index c2cbca005d7..f1853bf5483 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/unrolled/render" "go.uber.org/zap" ) @@ -107,6 +108,7 @@ func NewService(srv *tsoserver.Service) *Service { func (s *Service) RegisterAdminRouter() { router := s.root.Group("admin") router.POST("/reset-ts", ResetTS) + router.PUT("/log", changeLogLevel) } // RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler. @@ -115,6 +117,22 @@ func (s *Service) RegisterKeyspaceGroupRouter() { router.GET("/members", GetKeyspaceGroupMembers) } +func changeLogLevel(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + var level string + if err := c.Bind(&level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err := svr.SetLogLevel(level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + c.String(http.StatusOK, "The log level is updated.") +} + // ResetTSParams is the input json body params of ResetTS type ResetTSParams struct { TSO string `json:"tso"` diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 40958ca463c..133f87b78f3 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -27,6 +27,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/tsopb" @@ -129,6 +130,17 @@ func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { s.service.RegisterGRPCService(grpcServer) } +// SetLogLevel sets log level. +func (s *Server) SetLogLevel(level string) error { + if !logutil.IsLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } + s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + // Run runs the TSO server. func (s *Server) Run() error { skipWaitAPIServiceReady := false diff --git a/pkg/utils/logutil/log.go b/pkg/utils/logutil/log.go index abb6a2783a0..3dc4430b066 100644 --- a/pkg/utils/logutil/log.go +++ b/pkg/utils/logutil/log.go @@ -162,3 +162,13 @@ func CondUint32(key string, val uint32, condition bool) zap.Field { } return zap.Skip() } + +// IsLevelLegal checks whether the level is legal. +func IsLevelLegal(level string) bool { + switch strings.ToLower(level) { + case "fatal", "error", "warn", "warning", "debug", "info": + return true + default: + return false + } +} diff --git a/server/server.go b/server/server.go index d3fe5446a03..aa3ef12c4f5 100644 --- a/server/server.go +++ b/server/server.go @@ -1494,7 +1494,7 @@ func (s *Server) GetClusterStatus() (*cluster.Status, error) { // SetLogLevel sets log level. func (s *Server) SetLogLevel(level string) error { - if !isLevelLegal(level) { + if !logutil.IsLevelLegal(level) { return errors.Errorf("log level %s is illegal", level) } s.cfg.Log.Level = level @@ -1503,15 +1503,6 @@ func (s *Server) SetLogLevel(level string) error { return nil } -func isLevelLegal(level string) bool { - switch strings.ToLower(level) { - case "fatal", "error", "warn", "warning", "debug", "info": - return true - default: - return false - } -} - // GetReplicationModeConfig returns the replication mode config. func (s *Server) GetReplicationModeConfig() *config.ReplicationModeConfig { return s.persistOptions.GetReplicationModeConfig().Clone() From f3ed1a0ba6d566fc7b4d05f0c191533add07e1b6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 8 Oct 2023 14:46:52 +0800 Subject: [PATCH 5/5] *: prevent store config updating periodically (#7163) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/config/watcher.go | 3 ++ pkg/mcs/scheduling/server/rule/watcher.go | 8 +++++ server/cluster/cluster.go | 35 ++++++++++++--------- server/cluster/cluster_test.go | 10 ++++-- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index b413e243c2b..6ad37045000 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -119,6 +119,7 @@ func (cw *Watcher) initializeConfigWatcher() error { zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + log.Info("update scheduling config", zap.Reflect("new", cfg)) cw.AdjustScheduleCfg(&cfg.Schedule) cw.SetClusterVersion(&cfg.ClusterVersion) cw.SetScheduleConfig(&cfg.Schedule) @@ -146,6 +147,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + log.Info("update scheduler config", zap.String("name", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", @@ -161,6 +163,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("remove scheduler config", zap.String("key", string(kv.Key))) return cw.storage.RemoveSchedulerConfig( strings.TrimPrefix(string(kv.Key), prefixToTrim), ) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index cf0e1cd8ba1..dc5735eb540 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -19,10 +19,12 @@ import ( "strings" "sync" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) // ruleStorage is an in-memory storage for Placement Rules, @@ -163,12 +165,14 @@ func (rw *Watcher) initializeRuleWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { // Since the PD API server will validate the rule before saving it to etcd, // so we could directly save the string rule in JSON to the storage here. + log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) return rw.ruleStore.SaveRule( strings.TrimPrefix(string(kv.Key), prefixToTrim), string(kv.Value), ) } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("delete placement rule", zap.String("key", string(kv.Key))) return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { @@ -188,12 +192,14 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeGroupWatcher() error { prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { + log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) return rw.ruleStore.SaveRuleGroup( strings.TrimPrefix(string(kv.Key), prefixToTrim), string(kv.Value), ) } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("delete placement rule group", zap.String("key", string(kv.Key))) return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { @@ -213,12 +219,14 @@ func (rw *Watcher) initializeGroupWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { + log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) return rw.ruleStore.SaveRegionRule( strings.TrimPrefix(string(kv.Key), prefixToTrim), string(kv.Value), ) } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("delete region label rule", zap.String("key", string(kv.Key))) return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 30ab50f0c9f..33ed5711ac5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -427,15 +427,15 @@ func (c *RaftCluster) runStoreConfigSync() { defer c.wg.Done() var ( - synced, switchRaftV2Config bool - stores = c.GetStores() + synced, switchRaftV2Config, needPersist bool + stores = c.GetStores() ) // Start the ticker with a second-level timer to accelerate // the bootstrap stage. ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { - synced, switchRaftV2Config = c.syncStoreConfig(stores) + synced, switchRaftV2Config, needPersist = c.syncStoreConfig(stores) if switchRaftV2Config { if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil { log.Warn("store config persisted failed", zap.Error(err)) @@ -444,8 +444,11 @@ func (c *RaftCluster) runStoreConfigSync() { // Update the stores if the synchronization is not completed. if !synced { stores = c.GetStores() - } else if err := c.opt.Persist(c.storage); err != nil { - log.Warn("store config persisted failed", zap.Error(err)) + } + if needPersist { + if err := c.opt.Persist(c.storage); err != nil { + log.Warn("store config persisted failed", zap.Error(err)) + } } select { case <-c.ctx.Done(): @@ -459,7 +462,8 @@ func (c *RaftCluster) runStoreConfigSync() { // syncStoreConfig syncs the store config from TiKV. // - `synced` is true if sync config from one tikv. // - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2. -func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) { +func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool, needPersist bool) { + var err error for index := 0; index < len(stores); index++ { select { case <-c.ctx.Done(): @@ -479,7 +483,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) - switchRaftV2, err := c.observeStoreConfig(c.ctx, address) + switchRaftV2, needPersist, err = c.observeStoreConfig(c.ctx, address) if err != nil { // delete the store if it is failed and retry next store. stores = append(stores[:index], stores[index+1:]...) @@ -492,34 +496,35 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } storeSyncConfigEvent.WithLabelValues(address, "succ").Inc() - return true, switchRaftV2 + return true, switchRaftV2, needPersist } - return false, false + return false, false, needPersist } // observeStoreConfig is used to observe the store config changes and // return whether if the new config changes the engine to raft-kv2. -func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) { +func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (switchRaftV2 bool, needPersist bool, err error) { cfg, err := c.fetchStoreConfigFromTiKV(ctx, address) if err != nil { - return false, err + return false, false, err } oldCfg := c.opt.GetStoreConfig() if cfg == nil || oldCfg.Equal(cfg) { - return false, nil + return false, false, nil } log.Info("sync the store config successful", zap.String("store-address", address), zap.String("store-config", cfg.String()), zap.String("old-config", oldCfg.String())) - return c.updateStoreConfig(oldCfg, cfg) + return c.updateStoreConfig(oldCfg, cfg), true, nil } // updateStoreConfig updates the store config. This is extracted for testing. -func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) { +func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (switchRaftV2 bool) { cfg.Adjust() c.opt.SetStoreConfig(cfg) - return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil + switchRaftV2 = oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2 + return } // fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 33236c5d40c..31f6bb357c3 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1428,8 +1428,10 @@ func TestSyncConfigContext(t *testing.T) { // trip schema header now := time.Now() stores[0].GetMeta().StatusAddress = server.URL[7:] - synced, _ := tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores()) re.False(synced) + re.False(switchRaftV2) + re.False(needPersist) re.Less(time.Since(now), clientTimeout*2) } @@ -1450,15 +1452,17 @@ func TestStoreConfigSync(t *testing.T) { re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV", `return("10MiB")`)) // switchRaftV2 will be true. - synced, switchRaftV2 := tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores()) re.True(synced) re.True(switchRaftV2) + re.True(needPersist) re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize()) re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize()) // switchRaftV2 will be false this time. - synced, switchRaftV2 = tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist = tc.syncStoreConfig(tc.GetStores()) re.True(synced) re.False(switchRaftV2) + re.False(needPersist) re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize()) re.NoError(opt.Persist(tc.GetStorage())) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV"))