From f33af88fb704c8f7067d5384ae19f4c452a810a2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 27 Dec 2024 11:35:25 +0800 Subject: [PATCH] remove api mode concept Signed-off-by: Ryan Leung --- client/client.go | 2 +- client/http/client.go | 2 +- client/inner_client.go | 2 +- .../servicediscovery/pd_service_discovery.go | 4 +- cmd/pd-server/main.go | 14 ++--- pkg/mcs/registry/registry.go | 8 +-- pkg/mcs/scheduling/server/cluster.go | 18 +++---- pkg/mcs/scheduling/server/config/config.go | 6 +-- pkg/mcs/scheduling/server/config/watcher.go | 4 +- pkg/mcs/scheduling/server/grpc_service.go | 2 +- pkg/mcs/scheduling/server/meta/watcher.go | 4 +- pkg/mcs/scheduling/server/rule/watcher.go | 4 +- pkg/mcs/scheduling/server/server.go | 8 +-- pkg/mcs/utils/constant/constant.go | 4 +- pkg/member/election_leader.go | 2 +- pkg/schedule/coordinator.go | 2 +- .../schedulers/scheduler_controller.go | 2 +- pkg/tso/keyspace_group_manager.go | 2 +- pkg/utils/apiutil/serverapi/middleware.go | 4 +- server/api/admin.go | 2 +- server/api/server.go | 4 +- server/apiv2/handlers/micro_service.go | 4 +- server/cluster/cluster.go | 22 ++++---- server/config/config.go | 2 +- server/grpc_service.go | 2 +- server/server.go | 22 ++++---- server/server_test.go | 4 +- tests/autoscaling/autoscaling_test.go | 2 +- tests/cluster.go | 42 ++++++--------- tests/compatibility/version_upgrade_test.go | 4 +- tests/dashboard/race_test.go | 2 +- tests/integrations/client/client_test.go | 36 ++++++------- tests/integrations/client/client_tls_test.go | 4 +- tests/integrations/client/http_client_test.go | 2 +- .../mcs/discovery/register_test.go | 8 +-- .../mcs/keyspace/tso_keyspace_group_test.go | 2 +- tests/integrations/mcs/members/member_test.go | 2 +- .../resourcemanager/resource_manager_test.go | 2 +- tests/integrations/mcs/scheduling/api_test.go | 46 ++++++++-------- .../mcs/scheduling/config_test.go | 15 +++--- .../integrations/mcs/scheduling/meta_test.go | 3 +- .../integrations/mcs/scheduling/rule_test.go | 4 +- .../mcs/scheduling/server_test.go | 38 ++++++------- tests/integrations/mcs/tso/api_test.go | 8 +-- .../mcs/tso/keyspace_group_manager_test.go | 14 ++--- tests/integrations/mcs/tso/proxy_test.go | 2 +- tests/integrations/mcs/tso/server_test.go | 54 +++++++++---------- tests/integrations/tso/client_test.go | 8 +-- tests/integrations/tso/consistency_test.go | 2 +- tests/integrations/tso/server_test.go | 2 +- tests/registry/registry_test.go | 2 +- tests/server/api/api_test.go | 10 ++-- tests/server/api/scheduler_test.go | 2 +- tests/server/apiv2/handlers/keyspace_test.go | 2 +- tests/server/apiv2/handlers/ready_test.go | 2 +- .../apiv2/handlers/tso_keyspace_group_test.go | 2 +- tests/server/cluster/cluster_test.go | 40 +++++++------- tests/server/cluster/cluster_work_test.go | 6 +-- tests/server/config/config_test.go | 2 +- tests/server/id/id_test.go | 8 +-- tests/server/join/join_fail/join_fail_test.go | 2 +- tests/server/join/join_test.go | 8 +-- tests/server/member/member_test.go | 10 ++-- tests/server/server_test.go | 10 ++-- tests/server/tso/tso_test.go | 4 +- tests/testutil.go | 28 +++++----- tools/pd-backup/tests/backup_test.go | 2 +- tools/pd-ctl/pdctl/command/config_command.go | 24 ++++----- tools/pd-ctl/tests/config/config_test.go | 50 ++++++++--------- .../tests/keyspace/keyspace_group_test.go | 14 ++--- tools/pd-ctl/tests/keyspace/keyspace_test.go | 4 +- 71 files changed, 341 insertions(+), 349 deletions(-) diff --git a/client/client.go b/client/client.go index 2fc9bd3ef0d..4291b0afb4f 100644 --- a/client/client.go +++ b/client/client.go @@ -535,7 +535,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi // GetMinTS implements the TSOClient interface. func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { - // Handle compatibility issue in case of PD/API server doesn't support GetMinTS API. + // Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API. serviceMode := c.inner.getServiceMode() switch serviceMode { case pdpb.ServiceMode_UNKNOWN_SVC_MODE: diff --git a/client/http/client.go b/client/http/client.go index fa9801cf764..87746e3bcea 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -245,7 +245,7 @@ func (ci *clientInner) doRequest( if readErr != nil { logFields = append(logFields, zap.NamedError("read-body-error", err)) } else { - // API server will return a JSON body containing the detailed error message + // PD service will return a JSON body containing the detailed error message // when the status code is not `http.StatusOK` 200. bs = bytes.TrimSpace(bs) logFields = append(logFields, zap.ByteString("body", bs)) diff --git a/client/inner_client.go b/client/inner_client.go index 91f999dd3b5..94ce622d721 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -129,7 +129,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { c.tsoSvcDiscovery = newTSOSvcDiscovery // Close the old TSO service discovery safely after both the old client and service discovery are replaced. if oldTSOSvcDiscovery != nil { - // We are switching from API service mode to PD service mode, so delete the old tso microservice discovery. + // We are switching from PD service mode to PD service mode, so delete the old tso microservice discovery. oldTSOSvcDiscovery.Close() } } diff --git a/client/servicediscovery/pd_service_discovery.go b/client/servicediscovery/pd_service_discovery.go index 619d4196408..a95b57f9469 100644 --- a/client/servicediscovery/pd_service_discovery.go +++ b/client/servicediscovery/pd_service_discovery.go @@ -407,7 +407,7 @@ var ( _ TSOEventSource = (*pdServiceDiscovery)(nil) ) -// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based +// pdServiceDiscovery is the service discovery client of PD/PD service which is quorum based type pdServiceDiscovery struct { isInitialized bool @@ -670,7 +670,7 @@ func (c *pdServiceDiscovery) SetKeyspaceID(keyspaceID uint32) { // GetKeyspaceGroupID returns the ID of the keyspace group func (*pdServiceDiscovery) GetKeyspaceGroupID() uint32 { - // PD/API service only supports the default keyspace group + // PD/PD service only supports the default keyspace group return constants.DefaultKeyspaceGroupID } diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 165bcd2a12f..24ca46e7d5e 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -82,7 +82,7 @@ func NewServiceCommand() *cobra.Command { } cmd.AddCommand(NewTSOServiceCommand()) cmd.AddCommand(NewSchedulingServiceCommand()) - cmd.AddCommand(NewAPIServiceCommand()) + cmd.AddCommand(NewPDServiceCommand()) return cmd } @@ -128,12 +128,12 @@ func NewSchedulingServiceCommand() *cobra.Command { return cmd } -// NewAPIServiceCommand returns the API service command. -func NewAPIServiceCommand() *cobra.Command { +// NewPDServiceCommand returns the PD service command. +func NewPDServiceCommand() *cobra.Command { cmd := &cobra.Command{ Use: apiMode, - Short: "Run the API service", - Run: createAPIServerWrapper, + Short: "Run the PD service", + Run: createPDServiceWrapper, } addFlags(cmd) return cmd @@ -160,7 +160,7 @@ func addFlags(cmd *cobra.Command) { cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster") } -func createAPIServerWrapper(cmd *cobra.Command, args []string) { +func createPDServiceWrapper(cmd *cobra.Command, args []string) { start(cmd, args, cmd.CalledAs()) } @@ -219,7 +219,7 @@ func start(cmd *cobra.Command, args []string, services ...string) { defer log.Sync() memory.InitMemoryHook() if len(services) != 0 { - versioninfo.Log(server.APIServiceMode) + versioninfo.Log(server.PDServiceMode) } else { versioninfo.Log(server.PDMode) } diff --git a/pkg/mcs/registry/registry.go b/pkg/mcs/registry/registry.go index 6a01f091e52..2ffa04b1bf9 100644 --- a/pkg/mcs/registry/registry.go +++ b/pkg/mcs/registry/registry.go @@ -85,18 +85,18 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http serviceName := createServiceName(prefix, name) if l, ok := r.services[serviceName]; ok { if err := l.RegisterRESTHandler(h); err != nil { - log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) } else { - log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) + log.Info("restful PD service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) } continue } l := builder(srv) r.services[serviceName] = l if err := l.RegisterRESTHandler(h); err != nil { - log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) } else { - log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) + log.Info("restful PD service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) } } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 5c7166fba09..5d23ccbf95e 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -55,7 +55,7 @@ type Cluster struct { storage storage.Storage coordinator *schedule.Coordinator checkMembershipCh chan struct{} - apiServerLeader atomic.Value + pdLeader atomic.Value running atomic.Bool // heartbeatRunner is used to process the subtree update task asynchronously. @@ -227,7 +227,7 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { - client, err := c.getAPIServerLeaderClient() + client, err := c.getPDLeaderClient() if err != nil { return 0, err } @@ -241,11 +241,11 @@ func (c *Cluster) AllocID() (uint64, error) { return resp.GetId(), nil } -func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { - cli := c.apiServerLeader.Load() +func (c *Cluster) getPDLeaderClient() (pdpb.PDClient, error) { + cli := c.pdLeader.Load() if cli == nil { c.triggerMembershipCheck() - return nil, errors.New("API server leader is not found") + return nil, errors.New("PD leader is not found") } return cli.(pdpb.PDClient), nil } @@ -257,10 +257,10 @@ func (c *Cluster) triggerMembershipCheck() { } } -// SwitchAPIServerLeader switches the API server leader. -func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { - old := c.apiServerLeader.Load() - return c.apiServerLeader.CompareAndSwap(old, new) +// SwitchPDLeader switches the PD server leader. +func (c *Cluster) SwitchPDLeader(new pdpb.PDClient) bool { + old := c.pdLeader.Load() + return c.pdLeader.CompareAndSwap(old, new) } func trySend(notifier chan struct{}) { diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 784d1f45a82..d7c92d05a18 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -243,7 +243,7 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig { o.SetClusterVersion(&cfg.ClusterVersion) o.schedule.Store(&cfg.Schedule) o.replication.Store(&cfg.Replication) - // storeConfig will be fetched from TiKV by PD API server, + // storeConfig will be fetched from TiKV by PD PD service, // so we just set an empty value here first. o.storeConfig.Store(&sc.StoreConfig{}) o.ttl = ttl @@ -748,11 +748,11 @@ func (o *PersistConfig) IsRaftKV2() bool { // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. -// This method is a no-op since we only use configurations derived from one-way synchronization from API server now. +// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now. func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {} // RemoveSchedulerCfg removes the scheduler configurations. -// This method is a no-op since we only use configurations derived from one-way synchronization from API server now. +// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now. func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {} // CheckLabelProperty checks if the label property is satisfied. diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index f499a0d7d50..9db2d47d0f4 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -36,7 +36,7 @@ import ( "github.com/tikv/pd/pkg/utils/keypath" ) -// Watcher is used to watch the PD API server for any configuration changes. +// Watcher is used to watch the PD service for any configuration changes. type Watcher struct { wg sync.WaitGroup ctx context.Context @@ -76,7 +76,7 @@ type persistedConfig struct { Store sc.StoreConfig `json:"store"` } -// NewWatcher creates a new watcher to watch the config meta change from PD API server. +// NewWatcher creates a new watcher to watch the config meta change from PD service. func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 3d1183bf734..bd2cc40c21d 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -159,7 +159,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat region := core.RegionFromHeartbeat(request, 0) err = c.HandleRegionHeartbeat(region) if err != nil { - // TODO: if we need to send the error back to API server. + // TODO: if we need to send the error back to PD service. log.Error("failed handle region heartbeat", zap.Error(err)) continue } diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 27fe6687f3d..c51f10027d7 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -33,7 +33,7 @@ import ( "github.com/tikv/pd/pkg/utils/keypath" ) -// Watcher is used to watch the PD API server for any meta changes. +// Watcher is used to watch the PD service for any meta changes. type Watcher struct { wg sync.WaitGroup ctx context.Context @@ -48,7 +48,7 @@ type Watcher struct { storeWatcher *etcdutil.LoopWatcher } -// NewWatcher creates a new watcher to watch the meta change from PD API server. +// NewWatcher creates a new watcher to watch the meta change from PD service. func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index cc6480a0cb4..014a3abc2be 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -34,7 +34,7 @@ import ( "github.com/tikv/pd/pkg/utils/keypath" ) -// Watcher is used to watch the PD API server for any Placement Rule changes. +// Watcher is used to watch the PD service for any Placement Rule changes. type Watcher struct { ctx context.Context cancel context.CancelFunc @@ -74,7 +74,7 @@ type Watcher struct { patch *placement.RuleConfigPatch } -// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. +// NewWatcher creates a new watcher to watch the Placement Rule change from PD service. func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 8c9972d5eec..6890745fc35 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -110,7 +110,7 @@ type Server struct { hbStreams *hbstream.HeartbeatStreams storage *endpoint.StorageEndpoint - // for watching the PD API server meta info updates that are related to the scheduling. + // for watching the PD service meta info updates that are related to the scheduling. configWatcher *config.Watcher ruleWatcher *rule.Watcher metaWatcher *meta.Watcher @@ -169,10 +169,10 @@ func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) s.serverLoopWg.Add(2) go s.primaryElectionLoop() - go s.updateAPIServerMemberLoop() + go s.updatePDServiceMemberLoop() } -func (s *Server) updateAPIServerMemberLoop() { +func (s *Server) updatePDServiceMemberLoop() { defer logutil.LogPanic() defer s.serverLoopWg.Done() @@ -220,7 +220,7 @@ func (s *Server) updateAPIServerMemberLoop() { // double check break } - if s.cluster.SwitchAPIServerLeader(pdpb.NewPDClient(cc)) { + if s.cluster.SwitchPDLeader(pdpb.NewPDClient(cc)) { if status.Leader != curLeader { log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0])) } diff --git a/pkg/mcs/utils/constant/constant.go b/pkg/mcs/utils/constant/constant.go index 87fcf29f678..6f684bdb977 100644 --- a/pkg/mcs/utils/constant/constant.go +++ b/pkg/mcs/utils/constant/constant.go @@ -57,8 +57,8 @@ const ( // MicroserviceRootPath is the root path of microservice in etcd. MicroserviceRootPath = "/ms" - // APIServiceName is the name of api server. - APIServiceName = "api" + // PDServiceName is the name of pd server. + PDServiceName = "pd" // TSOServiceName is the name of tso server. TSOServiceName = "tso" // SchedulingServiceName is the name of scheduling server. diff --git a/pkg/member/election_leader.go b/pkg/member/election_leader.go index 81afc5dbd0a..2e5769d7dc4 100644 --- a/pkg/member/election_leader.go +++ b/pkg/member/election_leader.go @@ -21,7 +21,7 @@ import ( ) // ElectionLeader defines the common interface of the leader, which is the pdpb.Member -// for in PD/API service or the tsopb.Participant in the micro services. +// for in PD/PD service or the tsopb.Participant in the micro services. type ElectionLeader interface { // GetListenUrls returns the listen urls GetListenUrls() []string diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index e792560cb37..80299bf1e25 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -389,7 +389,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) { return } log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) - // TODO: handle the plugin in API service mode. + // TODO: handle the plugin in PD service mode. if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err)) return diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 28973631570..5f461d326c5 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -55,7 +55,7 @@ type Controller struct { // and used in the PD leader service mode now. schedulers map[string]*ScheduleController // schedulerHandlers is used to manage the HTTP handlers of schedulers, - // which will only be initialized and used in the API service mode now. + // which will only be initialized and used in the PD service mode now. schedulerHandlers map[string]http.Handler opController *operator.Controller } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d22d284e1be..b20be027f5c 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -334,7 +334,7 @@ type KeyspaceGroupManager struct { // Value: discover.ServiceRegistryEntry tsoServiceKey string // legacySvcRootPath defines the legacy root path for all etcd paths which derives from - // the PD/API service. It's in the format of "/pd/{cluster_id}". + // the PD/PD service. It's in the format of "/pd/{cluster_id}". // The main paths for different usages include: // 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the // storage endpoint. diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 85b958a5554..823deed64ea 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, } func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsAPIServiceMode() { + if !h.s.IsPDServiceMode() { return false, "" } if len(h.microserviceRedirectRules) == 0 { @@ -223,7 +223,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http clientUrls = leader.GetClientUrls() r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name()) } else { - // Prevent more than one redirection among PD/API servers. + // Prevent more than one redirection among PD/PD service. log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader)) http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) return diff --git a/server/api/admin.go b/server/api/admin.go index d2be53cf40e..561f4ec4bff 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -254,5 +254,5 @@ func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error { } func buildMsg(err error) string { - return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + return fmt.Sprintf("This operation was executed in PD service but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) } diff --git a/server/api/server.go b/server/api/server.go index 1a744635e2d..8e352b6a36e 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -63,7 +63,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP // Following requests are **not** redirected: // "/schedulers", http.MethodPost // "/schedulers/{name}", http.MethodDelete - // Because the writing of all the config of the scheduling service is in the API server, + // Because the writing of all the config of the scheduling service is in the PD service, // we should not post and delete the scheduler directly in the scheduling service. router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), @@ -153,7 +153,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/config/placement-rule", constant.SchedulingServiceName, []string{http.MethodGet}), - // because the writing of all the meta information of the scheduling service is in the API server, + // because the writing of all the meta information of the scheduling service is in the PD service, // we should not post and delete the scheduler directly in the scheduling service. serverapi.MicroserviceRedirectRule( prefix+"/schedulers", diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index c7fa0dc94f2..b4d3d6bbe89 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -39,7 +39,7 @@ func RegisterMicroService(r *gin.RouterGroup) { // @Router /ms/members/{service} [get] func GetMembers(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsAPIServiceMode() { + if !svr.IsPDServiceMode() { c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") return } @@ -65,7 +65,7 @@ func GetMembers(c *gin.Context) { // @Router /ms/primary/{service} [get] func GetPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsAPIServiceMode() { + if !svr.IsPDServiceMode() { c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 699b43e7901..494234b4c41 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -131,7 +131,7 @@ type Server interface { GetMembers() ([]*pdpb.Member, error) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error GetKeyspaceGroupManager() *keyspace.GroupManager - IsAPIServiceMode() bool + IsPDServiceMode() bool GetSafePointV2Manager() *gc.SafePointV2Manager } @@ -156,12 +156,12 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - isAPIServiceMode bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS atomic.Value // Store as uint64 - externalTS atomic.Value // Store as uint64 + running bool + isPDServiceMode bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS atomic.Value // Store as uint64 + externalTS atomic.Value // Store as uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -325,7 +325,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { log.Warn("raft cluster has already been started") return nil } - c.isAPIServiceMode = s.IsAPIServiceMode() + c.isPDServiceMode = s.IsPDServiceMode() err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { return err @@ -376,7 +376,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { c.loadExternalTS() c.loadMinResolvedTS() - if c.isAPIServiceMode { + if c.isPDServiceMode { // bootstrap keyspace group manager after starting other parts successfully. // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. err = c.keyspaceGroupManager.Bootstrap(c.ctx) @@ -404,7 +404,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { } func (c *RaftCluster) checkSchedulingService() { - if c.isAPIServiceMode { + if c.isPDServiceMode { servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName) if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) @@ -425,7 +425,7 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { - if c.isAPIServiceMode { + if c.isPDServiceMode { if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) if err != nil || len(servers) == 0 { diff --git a/server/config/config.go b/server/config/config.go index 282b5264fe9..69cd76409bc 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -860,7 +860,7 @@ func (c *MicroServiceConfig) Clone() *MicroServiceConfig { return &cfg } -// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to api service. +// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD service. func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { return c.EnableSchedulingFallback } diff --git a/server/grpc_service.go b/server/grpc_service.go index 398325cd30a..9f80c0bd849 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -272,7 +272,7 @@ func (s *GrpcServer) GetClusterInfo(context.Context, *pdpb.GetClusterInfoRequest } // GetMinTS implements gRPC PDServer. In PD service mode, it simply returns a timestamp. -// In API service mode, it queries all tso servers and gets the minimum timestamp across +// In PD service mode, it queries all tso servers and gets the minimum timestamp across // all keyspace groups. func (s *GrpcServer) GetMinTS( ctx context.Context, request *pdpb.GetMinTSRequest, diff --git a/server/server.go b/server/server.go index 3f397da4d0b..2aae8fe7c2c 100644 --- a/server/server.go +++ b/server/server.go @@ -101,8 +101,8 @@ const ( // PDMode represents that server is in PD mode. PDMode = "PD" - // APIServiceMode represents that server is in API service mode. - APIServiceMode = "API Service" + // PDServiceMode represents that server is in PD service mode. + PDServiceMode = "PD Service" // maxRetryTimesGetServicePrimary is the max retry times for getting primary addr. // Note: it need to be less than client.defaultPDTimeout @@ -243,7 +243,7 @@ type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APISer func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) { var mode string if len(services) != 0 { - mode = APIServiceMode + mode = PDServiceMode } else { mode = PDMode } @@ -478,7 +478,7 @@ func (s *Server) startServer(ctx context.Context) error { Member: s.member.MemberValue(), Step: keyspace.AllocStep, }) - if s.IsAPIServiceMode() { + if s.IsPDServiceMode() { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) } s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) @@ -529,7 +529,7 @@ func (s *Server) Close() { s.cgMonitor.StopMonitor() s.stopServerLoop() - if s.IsAPIServiceMode() { + if s.IsPDServiceMode() { s.keyspaceGroupManager.Close() } @@ -640,7 +640,7 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.etcdLeaderLoop() go s.serverMetricsLoop() go s.encryptionKeyManagerLoop() - if s.IsAPIServiceMode() { + if s.IsPDServiceMode() { s.initTSOPrimaryWatcher() s.initSchedulingPrimaryWatcher() } @@ -787,9 +787,9 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } -// IsAPIServiceMode return whether the server is in API service mode. -func (s *Server) IsAPIServiceMode() bool { - return s.mode == APIServiceMode +// IsPDServiceMode return whether the server is in PD service mode. +func (s *Server) IsPDServiceMode() bool { + return s.mode == PDServiceMode } // GetAddr returns the server urls for clients. @@ -1389,7 +1389,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { - if s.mode == APIServiceMode && !s.IsClosed() { + if s.mode == PDServiceMode && !s.IsClosed() { if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { return true } @@ -1666,7 +1666,7 @@ func (s *Server) campaignLeader() { log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name())) if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { - log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode), + log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/PD service may campaign successfully", s.mode), zap.String("campaign-leader-name", s.Name())) } else { log.Error(fmt.Sprintf("campaign %s leader meets error due to etcd error", s.mode), diff --git a/server/server_test.go b/server/server_test.go index 23da2078cb2..28839b89389 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -266,13 +266,13 @@ func TestAPIService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(ctx, cfg, []string{constant.APIServiceName}, mockHandler) + svr, err := CreateServer(ctx, cfg, []string{constant.PDServiceName}, mockHandler) re.NoError(err) defer svr.Close() err = svr.Run() re.NoError(err) MustWaitLeader(re, []*Server{svr}) - re.True(svr.IsAPIServiceMode()) + re.True(svr.IsPDServiceMode()) } func TestIsPathInDirectory(t *testing.T) { diff --git a/tests/autoscaling/autoscaling_test.go b/tests/autoscaling/autoscaling_test.go index c00bc00afe0..d95ecf216df 100644 --- a/tests/autoscaling/autoscaling_test.go +++ b/tests/autoscaling/autoscaling_test.go @@ -35,7 +35,7 @@ func TestAPI(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() diff --git a/tests/cluster.go b/tests/cluster.go index a4f445155e1..18e85b4c933 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -79,13 +79,8 @@ type TestServer struct { var zapLogOnce sync.Once // NewTestServer creates a new TestServer. -func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error) { - return createTestServer(ctx, cfg, nil) -} - -// NewTestAPIServer creates a new TestServer. -func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, error) { - return createTestServer(ctx, cfg, []string{constant.APIServiceName}) +func NewTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { + return createTestServer(ctx, cfg, services) } func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { @@ -435,15 +430,15 @@ type ConfigOption func(conf *config.Config, serverName string) // NewTestCluster creates a new TestCluster. func NewTestCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { - return createTestCluster(ctx, initialServerCount, false, opts...) + return createTestCluster(ctx, initialServerCount, nil, opts...) } -// NewTestAPICluster creates a new TestCluster with API service. -func NewTestAPICluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { - return createTestCluster(ctx, initialServerCount, true, opts...) +// NewTestMSCluster creates a new TestCluster with PD service. +func NewTestMSCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { + return createTestCluster(ctx, initialServerCount, []string{constant.PDServiceName}, opts...) } -func createTestCluster(ctx context.Context, initialServerCount int, isAPIServiceMode bool, opts ...ConfigOption) (*TestCluster, error) { +func createTestCluster(ctx context.Context, initialServerCount int, services []string, opts ...ConfigOption) (*TestCluster, error) { schedulers.Register() config := newClusterConfig(initialServerCount) servers := make(map[string]*TestServer) @@ -452,12 +447,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService if err != nil { return nil, err } - var s *TestServer - if isAPIServiceMode { - s, err = NewTestAPIServer(ctx, serverConf) - } else { - s, err = NewTestServer(ctx, serverConf) - } + s, err := NewTestServer(ctx, serverConf, services) if err != nil { return nil, err } @@ -481,7 +471,7 @@ func RestartTestAPICluster(ctx context.Context, cluster *TestCluster) (*TestClus } func restartTestCluster( - ctx context.Context, cluster *TestCluster, isAPIServiceMode bool, + ctx context.Context, cluster *TestCluster, isPDServiceMode bool, ) (newTestCluster *TestCluster, err error) { schedulers.Register() newTestCluster = &TestCluster{ @@ -508,10 +498,10 @@ func restartTestCluster( newServer *TestServer serverErr error ) - if isAPIServiceMode { - newServer, serverErr = NewTestAPIServer(ctx, serverCfg) + if isPDServiceMode { + newServer, serverErr = NewTestServer(ctx, serverCfg, []string{constant.PDServiceName}) } else { - newServer, serverErr = NewTestServer(ctx, serverCfg) + newServer, serverErr = NewTestServer(ctx, serverCfg, nil) } serverMap.Store(serverName, newServer) errorMap.Store(serverName, serverErr) @@ -735,7 +725,7 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ if err != nil { return nil, err } - s, err := NewTestServer(ctx, conf) + s, err := NewTestServer(ctx, conf, nil) if err != nil { return nil, err } @@ -743,13 +733,13 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ return s, nil } -// JoinAPIServer is used to add a new TestAPIServer into the cluster. -func (c *TestCluster) JoinAPIServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { +// JoinPDServer is used to add a new TestServer into the cluster. +func (c *TestCluster) JoinPDServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { conf, err := c.config.join().Generate(opts...) if err != nil { return nil, err } - s, err := NewTestAPIServer(ctx, conf) + s, err := NewTestServer(ctx, conf, []string{constant.PDServiceName}) if err != nil { return nil, err } diff --git a/tests/compatibility/version_upgrade_test.go b/tests/compatibility/version_upgrade_test.go index 7b5fd05071b..9658abbec16 100644 --- a/tests/compatibility/version_upgrade_test.go +++ b/tests/compatibility/version_upgrade_test.go @@ -33,7 +33,7 @@ func TestStoreRegister(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() @@ -88,7 +88,7 @@ func TestRollingUpgrade(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() err = cluster.RunInitialServers() diff --git a/tests/dashboard/race_test.go b/tests/dashboard/race_test.go index 6d117760f8f..c69000bf0b1 100644 --- a/tests/dashboard/race_test.go +++ b/tests/dashboard/race_test.go @@ -32,7 +32,7 @@ func TestCancelDuringStarting(t *testing.T) { defer cancel() re := require.New(t) - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index fadfb952e4c..4742eb38027 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -79,7 +79,7 @@ func TestClientLeaderChange(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() @@ -145,7 +145,7 @@ func TestLeaderTransferAndMoveCluster(t *testing.T) { }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() @@ -227,7 +227,7 @@ func TestGetTSAfterTransferLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 2, nil) re.NoError(err) endpoints := runServer(re, cluster) leader := cluster.WaitLeader() @@ -261,7 +261,7 @@ func TestTSOFollowerProxy(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() @@ -335,7 +335,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestAPICluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, []string{constant.PDServiceName}) re.NoError(err) defer cluster.Destroy() err = cluster.RunInitialServers() @@ -362,7 +362,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() @@ -425,7 +425,7 @@ func TestCustomTimeout(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() @@ -460,7 +460,7 @@ func (suite *followerForwardAndHandleTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.clean = context.WithCancel(context.Background()) sd.MemberHealthCheckInterval = 100 * time.Millisecond - cluster, err := tests.NewTestCluster(suite.ctx, 3) + cluster, err := tests.NewTestCluster(suite.ctx, 3, nil) re.NoError(err) suite.cluster = cluster suite.endpoints = runServer(re, cluster) @@ -876,7 +876,7 @@ func TestConfigTTLAfterTransferLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() err = cluster.RunInitialServers() @@ -923,7 +923,7 @@ func TestCloseClient(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() endpoints := runServer(re, cluster) @@ -1634,7 +1634,7 @@ func TestWatch(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() endpoints := runServer(re, cluster) @@ -1677,7 +1677,7 @@ func TestPutGet(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() endpoints := runServer(re, cluster) @@ -1712,7 +1712,7 @@ func TestClientWatchWithRevision(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() endpoints := runServer(re, cluster) @@ -1779,7 +1779,7 @@ func (suite *clientTestSuite) TestMemberUpdateBackOff() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() @@ -2007,7 +2007,7 @@ func TestGetRegionWithBackoff(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() endpoints := runServer(re, cluster) @@ -2057,7 +2057,7 @@ func TestCircuitBreaker(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() @@ -2110,7 +2110,7 @@ func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() @@ -2158,7 +2158,7 @@ func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() diff --git a/tests/integrations/client/client_tls_test.go b/tests/integrations/client/client_tls_test.go index 40a36205de0..b4aaebd8f7b 100644 --- a/tests/integrations/client/client_tls_test.go +++ b/tests/integrations/client/client_tls_test.go @@ -127,7 +127,7 @@ func testTLSReload( revertFunc func()) { tlsInfo := cloneFunc() // 1. start cluster with valid certs - clus, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + clus, err := tests.NewTestCluster(ctx, 1, nil, func(conf *config.Config, _ string) { conf.Security.TLSConfig = grpcutil.TLSConfig{ KeyPath: tlsInfo.KeyFile, CertPath: tlsInfo.CertFile, @@ -291,7 +291,7 @@ func TestMultiCN(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clus, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + clus, err := tests.NewTestCluster(ctx, 1, nil, func(conf *config.Config, _ string) { conf.Security.TLSConfig = grpcutil.TLSConfig{ KeyPath: testTLSInfo.KeyFile, CertPath: testTLSInfo.CertFile, diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 0cbf12261f3..be616459fbf 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -78,7 +78,7 @@ func (suite *httpClientTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)")) suite.ctx, suite.cancelFunc = context.WithCancel(context.Background()) - cluster, err := tests.NewTestCluster(suite.ctx, 2) + cluster, err := tests.NewTestCluster(suite.ctx, 2, nil) re.NoError(err) err = cluster.RunInitialServers() diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index da6fa158307..552dc2a2218 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -54,7 +54,7 @@ func (suite *serverRegisterTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1, []string{constant.PDServiceName}) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -84,7 +84,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) { addr := s.GetAddr() client := suite.pdLeader.GetEtcdClient() - // test API server discovery + // test PD service discovery endpoints, err := discovery.Discover(client, serviceName) re.NoError(err) @@ -98,7 +98,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) { re.True(exist) re.Equal(expectedPrimary, primary) - // test API server discovery after unregister + // test PD service discovery after unregister cleanup() endpoints, err = discovery.Discover(client, serviceName) re.NoError(err) @@ -140,7 +140,7 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin delete(serverMap, primary) expectedPrimary = tests.WaitForPrimaryServing(re, serverMap) - // test API server discovery + // test PD service discovery client := suite.pdLeader.GetEtcdClient() endpoints, err := discovery.Discover(client, serviceName) re.NoError(err) diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 44347b4757d..3324052de41 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -60,7 +60,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestAPICluster(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1, []string{constant.PDServiceName}) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index 28275849073..b1d186780f2 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -64,7 +64,7 @@ func (suite *memberTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestAPICluster(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1, []string{constant.PDServiceName}) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 2eb66df2e2b..1f607e6fcf3 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -75,7 +75,7 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { suite.ctx, suite.clean = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestCluster(suite.ctx, 2) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 2, nil) re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index abace06bb78..9b6b3d95145 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -56,7 +56,7 @@ func (suite *apiTestSuite) TearDownSuite() { } func (suite *apiTestSuite) TestGetCheckerByName() { - suite.env.RunTestInAPIMode(suite.checkGetCheckerByName) + suite.env.RunTestInPDServiceMode(suite.checkGetCheckerByName) } func (suite *apiTestSuite) checkGetCheckerByName(cluster *tests.TestCluster) { @@ -102,7 +102,7 @@ func (suite *apiTestSuite) checkGetCheckerByName(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestAPIForward() { - suite.env.RunTestInAPIMode(suite.checkAPIForward) + suite.env.RunTestInPDServiceMode(suite.checkAPIForward) } func (suite *apiTestSuite) checkAPIForward(cluster *tests.TestCluster) { @@ -378,7 +378,7 @@ func (suite *apiTestSuite) checkAPIForward(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestConfig() { - suite.env.RunTestInAPIMode(suite.checkConfig) + suite.env.RunTestInPDServiceMode(suite.checkConfig) } func (suite *apiTestSuite) checkConfig(cluster *tests.TestCluster) { @@ -401,7 +401,7 @@ func (suite *apiTestSuite) checkConfig(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestConfigForward() { - suite.env.RunTestInAPIMode(suite.checkConfigForward) + suite.env.RunTestInPDServiceMode(suite.checkConfigForward) } func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { @@ -413,7 +413,7 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", addr) // Test config forward - // Expect to get same config in scheduling server and api server + // Expect to get same config in scheduling server and PD service testutil.Eventually(re, func() bool { testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix, &cfg) re.Equal(cfg["schedule"].(map[string]any)["leader-schedule-limit"], @@ -421,8 +421,8 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { return cfg["replication"].(map[string]any)["max-replicas"] == float64(opts.GetReplicationConfig().MaxReplicas) }) - // Test to change config in api server - // Expect to get new config in scheduling server and api server + // Test to change config in PD service + // Expect to get new config in scheduling server and PD service reqData, err := json.Marshal(map[string]any{ "max-replicas": 4, }) @@ -436,7 +436,7 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { }) // Test to change config only in scheduling server - // Expect to get new config in scheduling server but not old config in api server + // Expect to get new config in scheduling server but not old config in PD service scheCfg := opts.GetScheduleConfig().Clone() scheCfg.LeaderScheduleLimit = 100 opts.SetScheduleConfig(scheCfg) @@ -452,7 +452,7 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestAdminRegionCache() { - suite.env.RunTestInAPIMode(suite.checkAdminRegionCache) + suite.env.RunTestInPDServiceMode(suite.checkAdminRegionCache) } func (suite *apiTestSuite) checkAdminRegionCache(cluster *tests.TestCluster) { @@ -479,7 +479,7 @@ func (suite *apiTestSuite) checkAdminRegionCache(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestAdminRegionCacheForward() { - suite.env.RunTestInAPIMode(suite.checkAdminRegionCacheForward) + suite.env.RunTestInPDServiceMode(suite.checkAdminRegionCacheForward) } func (suite *apiTestSuite) checkAdminRegionCacheForward(cluster *tests.TestCluster) { @@ -491,22 +491,22 @@ func (suite *apiTestSuite) checkAdminRegionCacheForward(cluster *tests.TestClust r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) tests.MustPutRegionInfo(re, cluster, r3) - apiServer := cluster.GetLeaderServer().GetServer() + pdServer := cluster.GetLeaderServer().GetServer() schedulingServer := cluster.GetSchedulingPrimaryServer() re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(3, pdServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) addr := cluster.GetLeaderServer().GetAddr() urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr) err := testutil.CheckDelete(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) re.NoError(err) re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(2, pdServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) err = testutil.CheckDelete(tests.TestDialClient, urlPrefix+"s", testutil.StatusOK(re)) re.NoError(err) re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(0, pdServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) } func (suite *apiTestSuite) TestFollowerForward() { @@ -520,7 +520,7 @@ func (suite *apiTestSuite) checkFollowerForward(cluster *tests.TestCluster) { leaderAddr := cluster.GetLeaderServer().GetAddr() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - follower, err := cluster.JoinAPIServer(ctx) + follower, err := cluster.JoinPDServer(ctx) re.NoError(err) re.NoError(follower.Run()) re.NotEmpty(cluster.WaitLeader()) @@ -558,7 +558,7 @@ func (suite *apiTestSuite) checkFollowerForward(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestMetrics() { - suite.env.RunTestInAPIMode(suite.checkMetrics) + suite.env.RunTestInPDServiceMode(suite.checkMetrics) } func (suite *apiTestSuite) checkMetrics(cluster *tests.TestCluster) { @@ -577,7 +577,7 @@ func (suite *apiTestSuite) checkMetrics(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestStatus() { - suite.env.RunTestInAPIMode(suite.checkStatus) + suite.env.RunTestInPDServiceMode(suite.checkStatus) } func (suite *apiTestSuite) checkStatus(cluster *tests.TestCluster) { @@ -600,7 +600,7 @@ func (suite *apiTestSuite) checkStatus(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestStores() { - suite.env.RunTestInAPIMode(suite.checkStores) + suite.env.RunTestInPDServiceMode(suite.checkStores) } func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) { @@ -647,8 +647,8 @@ func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) { tests.MustPutStore(re, cluster, store) } // Test /stores - apiServerAddr := cluster.GetLeaderServer().GetAddr() - urlPrefix := fmt.Sprintf("%s/pd/api/v1/stores", apiServerAddr) + pdServiceAddr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/stores", pdServiceAddr) var resp map[string]any err := testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix, &resp) re.NoError(err) @@ -682,7 +682,7 @@ func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) { } func (suite *apiTestSuite) TestRegions() { - suite.env.RunTestInAPIMode(suite.checkRegions) + suite.env.RunTestInPDServiceMode(suite.checkRegions) } func (suite *apiTestSuite) checkRegions(cluster *tests.TestCluster) { @@ -691,8 +691,8 @@ func (suite *apiTestSuite) checkRegions(cluster *tests.TestCluster) { tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d")) tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f")) // Test /regions - apiServerAddr := cluster.GetLeaderServer().GetAddr() - urlPrefix := fmt.Sprintf("%s/pd/api/v1/regions", apiServerAddr) + pdServiceAddr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/regions", pdServiceAddr) var resp map[string]any err := testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix, &resp) re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index d7d200814bb..b4962e0a1b7 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/mcs/utils/constant" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/schedule/types" @@ -62,7 +63,7 @@ func (suite *configTestSuite) SetupSuite() { schedulers.Register() var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1, []string{constant.PDServiceName}) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -132,7 +133,7 @@ func (suite *configTestSuite) TestConfigWatch() { watcher.Close() } -// Manually trigger the config persistence in the PD API server side. +// Manually trigger the config persistence in the PD service side. func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) { err := pdLeaderServer.GetPersistOptions().Persist(pdLeaderServer.GetServer().GetStorage()) re.NoError(err) @@ -152,19 +153,19 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { ) re.NoError(err) // Get all default scheduler names. - var namesFromAPIServer []string + var namesFromPDServer []string testutil.Eventually(re, func() bool { - namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() - return len(namesFromAPIServer) == len(sc.DefaultSchedulers) + namesFromPDServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() + return len(namesFromPDServer) == len(sc.DefaultSchedulers) }) // Check all default schedulers' configs. var namesFromSchedulingServer []string testutil.Eventually(re, func() bool { namesFromSchedulingServer, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) - return len(namesFromSchedulingServer) == len(namesFromAPIServer) + return len(namesFromSchedulingServer) == len(namesFromPDServer) }) - re.Equal(namesFromAPIServer, namesFromSchedulingServer) + re.Equal(namesFromPDServer, namesFromSchedulingServer) // Add a new scheduler. api.MustAddScheduler(re, suite.pdLeaderServer.GetAddr(), types.EvictLeaderScheduler.String(), map[string]any{ "store_id": 1, diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index 4e0d5249fdb..ec9b88d5de9 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/scheduling/server/meta" + "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/tests" @@ -53,7 +54,7 @@ func (suite *metaTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1, []string{constant.PDServiceName}) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index 880dfddbb16..c407a742ed9 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -54,7 +54,7 @@ func (suite *ruleTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -97,7 +97,7 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Equal(placement.DefaultGroupID, ruleGroups[0].ID) re.Equal(0, ruleGroups[0].Index) re.False(ruleGroups[0].Override) - // Set a new rule via the PD API server. + // Set a new rule via the PD service. apiRuleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() rule := &placement.Rule{ GroupID: "2", diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 3401fb880cb..abb9569c808 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -66,7 +66,7 @@ func (suite *serverTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -220,7 +220,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { // Change back to the default value. conf.EnableSchedulingFallback = true leaderServer.SetMicroServiceConfig(*conf) - // API server will execute scheduling jobs since there is no scheduling server. + // PD server will execute scheduling jobs since there is no scheduling server. testutil.Eventually(re, func() bool { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -229,7 +229,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) - // After scheduling server is started, API server will not execute scheduling jobs. + // After scheduling server is started, PD server will not execute scheduling jobs. testutil.Eventually(re, func() bool { return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -238,7 +238,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() }) tc.GetPrimaryServer().Close() - // Stop scheduling server. API server will execute scheduling jobs again. + // Stop scheduling server. PD server will execute scheduling jobs again. testutil.Eventually(re, func() bool { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -246,7 +246,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { re.NoError(err) defer tc1.Destroy() tc1.WaitForPrimaryServing(re) - // After scheduling server is started, API server will not execute scheduling jobs. + // After scheduling server is started, PD server will not execute scheduling jobs. testutil.Eventually(re, func() bool { return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -259,21 +259,21 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { re := suite.Require() - // API server will execute scheduling jobs since there is no scheduling server. + // PD server will execute scheduling jobs since there is no scheduling server. testutil.Eventually(re, func() bool { re.NotNil(suite.pdLeader.GetServer()) re.NotNil(suite.pdLeader.GetServer().GetRaftCluster()) return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) leaderServer := suite.pdLeader.GetServer() - // After Disabling scheduling service fallback, the API server will stop scheduling. + // After Disabling scheduling service fallback, the PD server will stop scheduling. conf := leaderServer.GetMicroServiceConfig().Clone() conf.EnableSchedulingFallback = false leaderServer.SetMicroServiceConfig(*conf) testutil.Eventually(re, func() bool { return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) - // Enable scheduling service fallback again, the API server will restart scheduling. + // Enable scheduling service fallback again, the PD server will restart scheduling. conf.EnableSchedulingFallback = true leaderServer.SetMicroServiceConfig(*conf) testutil.Eventually(re, func() bool { @@ -284,7 +284,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) - // After scheduling server is started, API server will not execute scheduling jobs. + // After scheduling server is started, PD server will not execute scheduling jobs. testutil.Eventually(re, func() bool { return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -292,7 +292,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { testutil.Eventually(re, func() bool { return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() }) - // Disable scheduling service fallback and stop scheduling server. API server won't execute scheduling jobs again. + // Disable scheduling service fallback and stop scheduling server. PD server won't execute scheduling jobs again. conf.EnableSchedulingFallback = false leaderServer.SetMicroServiceConfig(*conf) tc.GetPrimaryServer().Close() @@ -310,14 +310,14 @@ func (suite *serverTestSuite) TestSchedulerSync() { tc.WaitForPrimaryServing(re) schedulersController := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetSchedulersController() checkEvictLeaderSchedulerExist(re, schedulersController, false) - // Add a new evict-leader-scheduler through the API server. + // Add a new evict-leader-scheduler through the PD server. api.MustAddScheduler(re, suite.backendEndpoints, types.EvictLeaderScheduler.String(), map[string]any{ "store_id": 1, }) // Check if the evict-leader-scheduler is added. checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) - // Add a store_id to the evict-leader-scheduler through the API server. + // Add a store_id to the evict-leader-scheduler through the PD server. err = suite.pdLeader.GetServer().GetRaftCluster().PutMetaStore( &metapb.Store{ Id: 2, @@ -334,18 +334,18 @@ func (suite *serverTestSuite) TestSchedulerSync() { }) checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) - // Delete a store_id from the evict-leader-scheduler through the API server. + // Delete a store_id from the evict-leader-scheduler through the PD server. api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", types.EvictLeaderScheduler.String(), 1)) checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{2}) - // Add a store_id to the evict-leader-scheduler through the API server by the scheduler handler. + // Add a store_id to the evict-leader-scheduler through the PD server by the scheduler handler. api.MustCallSchedulerConfigAPI(re, http.MethodPost, suite.backendEndpoints, types.EvictLeaderScheduler.String(), []string{"config"}, map[string]any{ "name": types.EvictLeaderScheduler.String(), "store_id": 1, }) checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) - // Delete a store_id from the evict-leader-scheduler through the API server by the scheduler handler. + // Delete a store_id from the evict-leader-scheduler through the PD server by the scheduler handler. api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, types.EvictLeaderScheduler.String(), []string{"delete", "2"}, nil) checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) @@ -354,7 +354,7 @@ func (suite *serverTestSuite) TestSchedulerSync() { // Check if the scheduler is removed. checkEvictLeaderSchedulerExist(re, schedulersController, false) - // Delete the evict-leader-scheduler through the API server by removing the last store_id. + // Delete the evict-leader-scheduler through the PD server by removing the last store_id. api.MustAddScheduler(re, suite.backendEndpoints, types.EvictLeaderScheduler.String(), map[string]any{ "store_id": 1, }) @@ -363,7 +363,7 @@ func (suite *serverTestSuite) TestSchedulerSync() { api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", types.EvictLeaderScheduler.String(), 1)) checkEvictLeaderSchedulerExist(re, schedulersController, false) - // Delete the evict-leader-scheduler through the API server. + // Delete the evict-leader-scheduler through the PD server. api.MustAddScheduler(re, suite.backendEndpoints, types.EvictLeaderScheduler.String(), map[string]any{ "store_id": 1, }) @@ -551,7 +551,7 @@ func (suite *serverTestSuite) TestStoreLimit() { leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.RemovePeer, 60) leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 60) leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) - // There is a time window between setting store limit in API service side and capturing the change in scheduling service. + // There is a time window between setting store limit in PD service side and capturing the change in scheduling service. waitSyncFinish(re, tc, storelimit.AddPeer, 60) for i := uint64(1); i <= 5; i++ { op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) @@ -636,7 +636,7 @@ func (suite *multipleServerTestSuite) SetupSuite() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 2) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 2) re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 91614530ef1..d8147d8cdae 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -62,7 +62,7 @@ func (suite *tsoAPITestSuite) SetupTest() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.pdCluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.pdCluster, err = tests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) err = suite.pdCluster.RunInitialServers() re.NoError(err) @@ -137,7 +137,7 @@ func TestTSOServerStartFirst(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - apiCluster, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + apiCluster, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{"k1", "k2"} }) defer apiCluster.Destroy() @@ -200,7 +200,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestAPICluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, []string{constant.PDServiceName}) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -227,7 +227,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"), testutil.WithHeader(re, apiutil.XForwardedToMicroServiceHeader, "true")) re.NoError(err) - // If close tso server, it should try forward to tso server, but return error in api mode. + // If close tso server, it should try forward to tso server, but return error in pd service mode. ttc.Destroy() err = testutil.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "admin/reset-ts"), input, testutil.Status(re, http.StatusInternalServerError), testutil.StringContain(re, "[PD:apiutil:ErrRedirect]redirect failed")) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 2c19f6588e5..d1253fb16a1 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -82,7 +82,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -537,8 +537,8 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) - // Init api server config but not start. - tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + // Init PD service config but not start. + tc, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -546,7 +546,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - // Start api server and tso server. + // Start PD service and tso server. err = tc.RunInitialServers() re.NoError(err) defer tc.Destroy() @@ -734,8 +734,8 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) - // Init api server config but not start. - tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + // Init PD service config but not start. + tc, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -743,7 +743,7 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - // Start api server and tso server. + // Start PD service and tso server. err = tc.RunInitialServers() re.NoError(err) defer tc.Destroy() diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index b564076c1f0..2950047f0ab 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -62,7 +62,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { var err error s.ctx, s.cancel = context.WithCancel(context.Background()) // Create an API cluster with 1 server - s.apiCluster, err = tests.NewTestAPICluster(s.ctx, 1) + s.apiCluster, err = tests.NewTestMSCluster(s.ctx, 1) re.NoError(err) err = s.apiCluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 09a199c2d52..c4a3e522bc8 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -75,7 +75,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -156,23 +156,23 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() { func TestTSOPath(t *testing.T) { re := require.New(t) - checkTSOPath(re, true /*isAPIServiceMode*/) - checkTSOPath(re, false /*isAPIServiceMode*/) + checkTSOPath(re, true /*isPDServiceMode*/) + checkTSOPath(re, false /*isPDServiceMode*/) } -func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { +func checkTSOPath(re *require.Assertions, isPDServiceMode bool) { var ( cluster *tests.TestCluster err error ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if isAPIServiceMode { - cluster, err = tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + if isPDServiceMode { + cluster, err = tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.MicroService.EnableTSODynamicSwitching = false }) } else { - cluster, err = tests.NewTestCluster(ctx, 1) + cluster, err = tests.NewTestCluster(ctx, 1, nil) } re.NoError(err) defer cluster.Destroy() @@ -184,7 +184,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { re.NoError(pdLeader.BootstrapCluster()) backendEndpoints := pdLeader.GetAddr() client := pdLeader.GetEtcdClient() - if isAPIServiceMode { + if isPDServiceMode { re.Equal(0, getEtcdTimestampKeyNum(re, client)) } else { re.Equal(1, getEtcdTimestampKeyNum(re, client)) @@ -217,7 +217,7 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int return count } -type APIServerForward struct { +type PDServerForward struct { re *require.Assertions ctx context.Context cancel context.CancelFunc @@ -227,13 +227,13 @@ type APIServerForward struct { pdClient pd.Client } -func NewAPIServerForward(re *require.Assertions) APIServerForward { - suite := APIServerForward{ +func NewPDServerForward(re *require.Assertions) PDServerForward { + suite := PDServerForward{ re: re, } var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 3) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -254,7 +254,7 @@ func NewAPIServerForward(re *require.Assertions) APIServerForward { return suite } -func (suite *APIServerForward) ShutDown() { +func (suite *PDServerForward) ShutDown() { suite.pdClient.Close() re := suite.re @@ -273,7 +273,7 @@ func (suite *APIServerForward) ShutDown() { func TestForwardTSORelated(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() leaderServer := suite.cluster.GetLeaderServer().GetServer() cfg := leaderServer.GetMicroServiceConfig().Clone() @@ -290,7 +290,7 @@ func TestForwardTSORelated(t *testing.T) { func TestForwardTSOWhenPrimaryChanged(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -330,7 +330,7 @@ func TestForwardTSOWhenPrimaryChanged(t *testing.T) { func TestResignTSOPrimaryForward(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() // TODO: test random kill primary with 3 nodes tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -356,7 +356,7 @@ func TestResignTSOPrimaryForward(t *testing.T) { func TestResignAPIPrimaryForward(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -380,7 +380,7 @@ func TestResignAPIPrimaryForward(t *testing.T) { func TestForwardTSOUnexpectedToFollower1(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { // unary call will retry internally @@ -393,7 +393,7 @@ func TestForwardTSOUnexpectedToFollower1(t *testing.T) { func TestForwardTSOUnexpectedToFollower2(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { // unary call will retry internally @@ -407,7 +407,7 @@ func TestForwardTSOUnexpectedToFollower2(t *testing.T) { func TestForwardTSOUnexpectedToFollower3(t *testing.T) { re := require.New(t) - suite := NewAPIServerForward(re) + suite := NewPDServerForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { _, _, err := suite.pdClient.GetTS(suite.ctx) @@ -415,7 +415,7 @@ func TestForwardTSOUnexpectedToFollower3(t *testing.T) { }) } -func (suite *APIServerForward) checkForwardTSOUnexpectedToFollower(checkTSO func()) { +func (suite *PDServerForward) checkForwardTSOUnexpectedToFollower(checkTSO func()) { re := suite.re tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) @@ -451,7 +451,7 @@ func (suite *APIServerForward) checkForwardTSOUnexpectedToFollower(checkTSO func tc.Destroy() } -func (suite *APIServerForward) addRegions() { +func (suite *PDServerForward) addRegions() { leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) rc := leader.GetServer().GetRaftCluster() for i := range 3 { @@ -465,7 +465,7 @@ func (suite *APIServerForward) addRegions() { } } -func (suite *APIServerForward) checkUnavailableTSO(re *require.Assertions) { +func (suite *PDServerForward) checkUnavailableTSO(re *require.Assertions) { _, _, err := suite.pdClient.GetTS(suite.ctx) re.Error(err) // try to update gc safe point @@ -476,7 +476,7 @@ func (suite *APIServerForward) checkUnavailableTSO(re *require.Assertions) { re.Error(err) } -func (suite *APIServerForward) checkAvailableTSO(re *require.Assertions) { +func (suite *PDServerForward) checkAvailableTSO(re *require.Assertions) { mcs.WaitForTSOServiceAvailable(suite.ctx, re, suite.pdClient) // try to get ts _, _, err := suite.pdClient.GetTS(suite.ctx) @@ -512,7 +512,7 @@ func (suite *CommonTestSuite) SetupSuite() { var err error re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -576,7 +576,7 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { } check() - s, err := suite.cluster.JoinAPIServer(suite.ctx) + s, err := suite.cluster.JoinPDServer(suite.ctx) re.NoError(err) re.NoError(s.Run()) @@ -598,7 +598,7 @@ func TestTSOServiceSwitch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestAPICluster(ctx, 1, + tc, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.MicroService.EnableTSODynamicSwitching = true }, diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 2cda9f8734f..ced9ab473e7 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { conf.MicroService.EnableTSODynamicSwitching = false }) } @@ -498,7 +498,7 @@ func TestMixedTSODeployment(t *testing.T) { }() ctx, cancel := context.WithCancel(context.Background()) - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cancel() defer cluster.Destroy() @@ -510,7 +510,7 @@ func TestMixedTSODeployment(t *testing.T) { re.NotNil(leaderServer) backendEndpoints := leaderServer.GetAddr() - apiSvr, err := cluster.JoinAPIServer(ctx) + apiSvr, err := cluster.JoinPDServer(ctx) re.NoError(err) err = apiSvr.Run() re.NoError(err) @@ -544,7 +544,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Create an API cluster which has 3 servers - apiCluster, err := tests.NewTestAPICluster(ctx, 3) + apiCluster, err := tests.NewTestMSCluster(ctx, 3) re.NoError(err) err = apiCluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index 147f41a4591..a4fed7d9463 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -76,7 +76,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, serverCount) } re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index f03db197b35..814d1d4b8f0 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -74,7 +74,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount) + suite.cluster, err = tests.NewTestMSCluster(suite.ctx, serverCount) } re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/registry/registry_test.go b/tests/registry/registry_test.go index 87d79c4f896..9926c53d693 100644 --- a/tests/registry/registry_test.go +++ b/tests/registry/registry_test.go @@ -71,7 +71,7 @@ func TestRegistryService(t *testing.T) { registry.ServerServiceRegistry.RegisterService("test", newTestServiceRegistry) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 00c43d11309..6e730529356 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -124,7 +124,7 @@ func (suite *middlewareTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/server/api/enableFailpointAPI", "return(true)")) ctx, cancel := context.WithCancel(context.Background()) suite.cleanup = cancel - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) re.NoError(cluster.RunInitialServers()) re.NotEmpty(cluster.WaitLeader()) @@ -199,7 +199,7 @@ func (suite *middlewareTestSuite) TestRequestInfoMiddleware() { func BenchmarkDoRequestWithServiceMiddleware(b *testing.B) { b.StopTimer() ctx, cancel := context.WithCancel(context.Background()) - cluster, _ := tests.NewTestCluster(ctx, 1) + cluster, _ := tests.NewTestCluster(ctx, 1, nil) cluster.RunInitialServers() cluster.WaitLeader() leader := cluster.GetLeaderServer() @@ -506,7 +506,7 @@ func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { func BenchmarkDoRequestWithLocalLogAudit(b *testing.B) { b.StopTimer() ctx, cancel := context.WithCancel(context.Background()) - cluster, _ := tests.NewTestCluster(ctx, 1) + cluster, _ := tests.NewTestCluster(ctx, 1, nil) cluster.RunInitialServers() cluster.WaitLeader() leader := cluster.GetLeaderServer() @@ -528,7 +528,7 @@ func BenchmarkDoRequestWithLocalLogAudit(b *testing.B) { func BenchmarkDoRequestWithPrometheusAudit(b *testing.B) { b.StopTimer() ctx, cancel := context.WithCancel(context.Background()) - cluster, _ := tests.NewTestCluster(ctx, 1) + cluster, _ := tests.NewTestCluster(ctx, 1, nil) cluster.RunInitialServers() cluster.WaitLeader() leader := cluster.GetLeaderServer() @@ -550,7 +550,7 @@ func BenchmarkDoRequestWithPrometheusAudit(b *testing.B) { func BenchmarkDoRequestWithoutServiceMiddleware(b *testing.B) { b.StopTimer() ctx, cancel := context.WithCancel(context.Background()) - cluster, _ := tests.NewTestCluster(ctx, 1) + cluster, _ := tests.NewTestCluster(ctx, 1, nil) cluster.RunInitialServers() cluster.WaitLeader() leader := cluster.GetLeaderServer() diff --git a/tests/server/api/scheduler_test.go b/tests/server/api/scheduler_test.go index 1f76c469cfd..479bc1fc2e1 100644 --- a/tests/server/api/scheduler_test.go +++ b/tests/server/api/scheduler_test.go @@ -55,7 +55,7 @@ func TestPDSchedulingTestSuite(t *testing.T) { func TestAPISchedulingTestSuite(t *testing.T) { suite.Run(t, &scheduleTestSuite{ - runMode: tests.APIMode, + runMode: tests.PDMSMode, }) } diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index ea8f14cd971..3354fd69211 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -51,7 +51,7 @@ func (suite *keyspaceTestSuite) SetupTest() { re := suite.Require() ctx, cancel := context.WithCancel(context.Background()) suite.cleanup = cancel - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/server/apiv2/handlers/ready_test.go b/tests/server/apiv2/handlers/ready_test.go index 111fc06784d..d5a18e71f28 100644 --- a/tests/server/apiv2/handlers/ready_test.go +++ b/tests/server/apiv2/handlers/ready_test.go @@ -34,7 +34,7 @@ func TestReady(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() re.NoError(cluster.RunInitialServers()) diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 796fd514eef..03b8c28d879 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -42,7 +42,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) { func (suite *keyspaceGroupTestSuite) SetupTest() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - cluster, err := tests.NewTestAPICluster(suite.ctx, 1) + cluster, err := tests.NewTestMSCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 357a76ace21..d441f93bfc6 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -74,7 +74,7 @@ func TestBootstrap(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -114,7 +114,7 @@ func TestDamagedRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -198,7 +198,7 @@ func TestRegionStatistics(t *testing.T) { }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 3) + tc, err := tests.NewTestCluster(ctx, 3, nil) defer tc.Destroy() re.NoError(err) @@ -290,7 +290,7 @@ func TestStaleRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -335,7 +335,7 @@ func TestGetPutConfig(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -564,7 +564,7 @@ func TestRaftClusterRestart(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -731,7 +731,7 @@ func TestGetPDMembers(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -754,7 +754,7 @@ func TestNotLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 2) + tc, err := tests.NewTestCluster(ctx, 2, nil) defer tc.Destroy() re.NoError(err) re.NoError(tc.RunInitialServers()) @@ -775,7 +775,7 @@ func TestStoreVersionChange(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -815,7 +815,7 @@ func TestConcurrentHandleRegion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dashboard.SetCheckInterval(30 * time.Minute) - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1000,7 +1000,7 @@ func TestLoadClusterInfo(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) @@ -1244,7 +1244,7 @@ func TestOfflineStoreLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dashboard.SetCheckInterval(30 * time.Minute) - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1336,7 +1336,7 @@ func TestUpgradeStoreLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dashboard.SetCheckInterval(30 * time.Minute) - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1394,7 +1394,7 @@ func TestStaleTermHeartbeat(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dashboard.SetCheckInterval(30 * time.Minute) - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer tc.Destroy() err = tc.RunInitialServers() @@ -1481,7 +1481,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) - tc, err := tests.NewTestCluster(ctx, 2) + tc, err := tests.NewTestCluster(ctx, 2, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1644,7 +1644,7 @@ func TestMinResolvedTS(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cluster.DefaultMinResolvedTSPersistenceInterval = time.Millisecond - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1757,7 +1757,7 @@ func TestTransferLeaderBack(t *testing.T) { }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 2) + tc, err := tests.NewTestCluster(ctx, 2, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1819,7 +1819,7 @@ func TestExternalTimestamp(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1930,7 +1930,7 @@ func TestPatrolRegionConfigChange(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -1988,7 +1988,7 @@ func TestFollowerExitSyncTime(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, nil) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() diff --git a/tests/server/cluster/cluster_work_test.go b/tests/server/cluster/cluster_work_test.go index e0ba916bf7b..9b2cf474612 100644 --- a/tests/server/cluster/cluster_work_test.go +++ b/tests/server/cluster/cluster_work_test.go @@ -35,7 +35,7 @@ func TestValidRequestRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) @@ -79,7 +79,7 @@ func TestAskSplit(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) @@ -136,7 +136,7 @@ func TestPendingProcessedRegions(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index bb387b68030..eaf8f833133 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -41,7 +41,7 @@ func TestRateLimitConfigReload(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) re.NoError(err) defer cluster.Destroy() re.NoError(cluster.RunInitialServers()) diff --git a/tests/server/id/id_test.go b/tests/server/id/id_test.go index e4633c480f2..586f93ea261 100644 --- a/tests/server/id/id_test.go +++ b/tests/server/id/id_test.go @@ -40,7 +40,7 @@ func TestID(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() @@ -86,7 +86,7 @@ func TestCommand(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() @@ -116,7 +116,7 @@ func TestMonotonicID(t *testing.T) { }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 2, nil) re.NoError(err) defer cluster.Destroy() @@ -163,7 +163,7 @@ func TestPDRestart(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) defer cluster.Destroy() diff --git a/tests/server/join/join_fail/join_fail_test.go b/tests/server/join/join_fail/join_fail_test.go index a037aa9e7b5..c9114792150 100644 --- a/tests/server/join/join_fail/join_fail_test.go +++ b/tests/server/join/join_fail/join_fail_test.go @@ -29,7 +29,7 @@ func TestFailedPDJoinInStep1(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/join/join_test.go b/tests/server/join/join_test.go index 8b0ce918377..90b253e2f1f 100644 --- a/tests/server/join/join_test.go +++ b/tests/server/join/join_test.go @@ -38,7 +38,7 @@ func TestSimpleJoin(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) @@ -87,7 +87,7 @@ func TestFailedAndDeletedPDJoinsPreviousCluster(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() server.EtcdStartTimeout = 10 * time.Second - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -120,7 +120,7 @@ func TestDeletedPDJoinsPreviousCluster(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() server.EtcdStartTimeout = 10 * time.Second - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -151,7 +151,7 @@ func TestFailedPDJoinsPreviousCluster(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index cb8fd7bda00..2683d44b42b 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -48,7 +48,7 @@ func TestMemberDelete(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -205,7 +205,7 @@ func TestLeaderResign(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -229,7 +229,7 @@ func TestLeaderResignWithBlock(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -252,7 +252,7 @@ func TestPDLeaderLostWhileEtcdLeaderIntact(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 2, nil) defer cluster.Destroy() re.NoError(err) @@ -328,7 +328,7 @@ func TestCampaignLeaderFrequently(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/server_test.go b/tests/server/server_test.go index 77cd7aa5158..a6865171dcb 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -42,7 +42,7 @@ func TestUpdateAdvertiseUrls(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 2, nil) defer cluster.Destroy() re.NoError(err) @@ -67,7 +67,7 @@ func TestUpdateAdvertiseUrls(t *testing.T) { for _, conf := range cluster.GetConfig().InitialServers { serverConf, err := conf.Generate() re.NoError(err) - s, err := tests.NewTestServer(ctx, serverConf) + s, err := tests.NewTestServer(ctx, serverConf, nil) re.NoError(err) cluster.GetServers()[conf.Name] = s } @@ -83,7 +83,7 @@ func TestClusterID(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -113,7 +113,7 @@ func TestLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, nil) defer cluster.Destroy() re.NoError(err) @@ -134,7 +134,7 @@ func TestGRPCRateLimit(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 51189966878..51fa94ce658 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -41,7 +41,7 @@ func TestRequestFollower(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 2, nil) re.NoError(err) defer cluster.Destroy() @@ -85,7 +85,7 @@ func TestDelaySyncTimestamp(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 2, nil) re.NoError(err) defer cluster.Destroy() re.NoError(cluster.RunInitialServers()) diff --git a/tests/testutil.go b/tests/testutil.go index 5e99b3dbeda..e098a22ea16 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -279,8 +279,8 @@ const ( Both SchedulerMode = iota // PDMode represents PD mode. PDMode - // APIMode represents API mode. - APIMode + // PDMSMode represents API mode. + PDMSMode ) // SchedulingTestEnvironment is used for test purpose. @@ -308,11 +308,11 @@ func (s *SchedulingTestEnvironment) RunTestBasedOnMode(test func(*TestCluster)) switch s.RunMode { case PDMode: s.RunTestInPDMode(test) - case APIMode: - s.RunTestInAPIMode(test) + case PDMSMode: + s.RunTestInPDServiceMode(test) default: s.RunTestInPDMode(test) - s.RunTestInAPIMode(test) + s.RunTestInPDServiceMode(test) } } @@ -339,8 +339,8 @@ func getTestName() string { return "" } -// RunTestInAPIMode is to run test in api mode. -func (s *SchedulingTestEnvironment) RunTestInAPIMode(test func(*TestCluster)) { +// RunTestInPDServiceMode is to run test in pd service mode. +func (s *SchedulingTestEnvironment) RunTestInPDServiceMode(test func(*TestCluster)) { re := require.New(s.t) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) @@ -348,11 +348,11 @@ func (s *SchedulingTestEnvironment) RunTestInAPIMode(test func(*TestCluster)) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) }() - s.t.Logf("start test %s in api mode", getTestName()) - if _, ok := s.clusters[APIMode]; !ok { - s.startCluster(APIMode) + s.t.Logf("start test %s in pd service mode", getTestName()) + if _, ok := s.clusters[PDMSMode]; !ok { + s.startCluster(PDMSMode) } - test(s.clusters[APIMode]) + test(s.clusters[PDMSMode]) } // Cleanup is to cleanup the environment. @@ -379,8 +379,8 @@ func (s *SchedulingTestEnvironment) startCluster(m SchedulerMode) { leaderServer := cluster.GetServer(cluster.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) s.clusters[PDMode] = cluster - case APIMode: - cluster, err := NewTestAPICluster(ctx, 1, s.opts...) + case PDMSMode: + cluster, err := NewTestCluster(ctx, 1, s.opts...) re.NoError(err) err = cluster.RunInitialServers() re.NoError(err) @@ -398,7 +398,7 @@ func (s *SchedulingTestEnvironment) startCluster(m SchedulerMode) { testutil.Eventually(re, func() bool { return cluster.GetLeaderServer().GetServer().IsServiceIndependent(constant.SchedulingServiceName) }) - s.clusters[APIMode] = cluster + s.clusters[PDMSMode] = cluster } } diff --git a/tools/pd-backup/tests/backup_test.go b/tools/pd-backup/tests/backup_test.go index 7c8c03d96e0..b8c9471ce7f 100644 --- a/tools/pd-backup/tests/backup_test.go +++ b/tools/pd-backup/tests/backup_test.go @@ -33,7 +33,7 @@ func TestBackup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, nil) re.NoError(err) err = cluster.RunInitialServers() re.NoError(err) diff --git a/tools/pd-ctl/pdctl/command/config_command.go b/tools/pd-ctl/pdctl/command/config_command.go index 2e9903db550..f8357b865f4 100644 --- a/tools/pd-ctl/pdctl/command/config_command.go +++ b/tools/pd-ctl/pdctl/command/config_command.go @@ -49,8 +49,8 @@ const ( ruleBundlePrefix = "pd/api/v1/config/placement-rule" pdServerPrefix = "pd/api/v1/config/pd-server" serviceMiddlewareConfigPrefix = "pd/api/v1/service-middleware/config" - // flagFromAPIServer has no influence for pd mode, but it is useful for us to debug in api mode. - flagFromAPIServer = "from_api_server" + // flagFromPDService has no influence for pd mode, but it is useful for us to debug in pd service mode. + flagFromPDService = "from_api_server" ) // NewConfigCommand return a config subcommand of rootCmd @@ -81,7 +81,7 @@ func NewShowConfigCommand() *cobra.Command { sc.AddCommand(newShowReplicationModeCommand()) sc.AddCommand(NewShowServerConfigCommand()) sc.AddCommand(NewShowServiceMiddlewareConfigCommand()) - sc.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + sc.Flags().Bool(flagFromPDService, false, "read data from PD service rather than micro service") return sc } @@ -92,7 +92,7 @@ func NewShowAllConfigCommand() *cobra.Command { Short: "show all config of PD", Run: showAllConfigCommandFunc, } - sc.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + sc.Flags().Bool(flagFromPDService, false, "read data from PD service rather than micro service") return sc } @@ -103,7 +103,7 @@ func NewShowScheduleConfigCommand() *cobra.Command { Short: "show schedule config of PD", Run: showScheduleConfigCommandFunc, } - sc.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + sc.Flags().Bool(flagFromPDService, false, "read data from PD service rather than micro service") return sc } @@ -114,7 +114,7 @@ func NewShowReplicationConfigCommand() *cobra.Command { Short: "show replication config of PD", Run: showReplicationConfigCommandFunc, } - sc.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + sc.Flags().Bool(flagFromPDService, false, "read data from PD service rather than micro service") return sc } @@ -528,7 +528,7 @@ func NewPlacementRulesCommand() *cobra.Command { show.Flags().String("id", "", "rule id") show.Flags().String("region", "", "region id") show.Flags().Bool("detail", false, "detailed match info for region") - show.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + show.Flags().Bool(flagFromPDService, false, "read data from PD server rather than micro service") load := &cobra.Command{ Use: "load", Short: "load placement rules to a file", @@ -538,7 +538,7 @@ func NewPlacementRulesCommand() *cobra.Command { load.Flags().String("id", "", "rule id") load.Flags().String("region", "", "region id") load.Flags().String("out", "rules.json", "the filename contains rules") - load.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + load.Flags().Bool(flagFromPDService, false, "read data from PD server rather than micro service") save := &cobra.Command{ Use: "save", Short: "save rules from file", @@ -554,7 +554,7 @@ func NewPlacementRulesCommand() *cobra.Command { Short: "show rule group configuration(s)", Run: showRuleGroupFunc, } - ruleGroupShow.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + ruleGroupShow.Flags().Bool(flagFromPDService, false, "read data from PD server rather than micro service") ruleGroupSet := &cobra.Command{ Use: "set ", Short: "update rule group configuration", @@ -577,7 +577,7 @@ func NewPlacementRulesCommand() *cobra.Command { Run: getRuleBundle, } ruleBundleGet.Flags().String("out", "", "the output file") - ruleBundleGet.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + ruleBundleGet.Flags().Bool(flagFromPDService, false, "read data from PD server rather than micro service") ruleBundleSet := &cobra.Command{ Use: "set", Short: "set rule group config and its rules from file", @@ -596,7 +596,7 @@ func NewPlacementRulesCommand() *cobra.Command { Run: loadRuleBundle, } ruleBundleLoad.Flags().String("out", "rules.json", "the output file") - ruleBundleLoad.Flags().Bool(flagFromAPIServer, false, "read data from api server rather than micro service") + ruleBundleLoad.Flags().Bool(flagFromPDService, false, "read data from PD server rather than micro service") ruleBundleSave := &cobra.Command{ Use: "save", Short: "save all group configs and rules from file", @@ -895,7 +895,7 @@ func saveRuleBundle(cmd *cobra.Command, _ []string) { func buildHeader(cmd *cobra.Command) http.Header { header := http.Header{} - forbiddenRedirectToMicroService, err := cmd.Flags().GetBool(flagFromAPIServer) + forbiddenRedirectToMicroService, err := cmd.Flags().GetBool(flagFromPDService) if err == nil && forbiddenRedirectToMicroService { header.Add(apiutil.XForbiddenForwardToMicroServiceHeader, "true") } diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index b6c58fe2bc6..7bbf3128096 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -383,9 +383,9 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu f.Close() defer os.RemoveAll(fname) - checkScheduleConfig := func(scheduleCfg *sc.ScheduleConfig, isFromAPIServer bool) { + checkScheduleConfig := func(scheduleCfg *sc.ScheduleConfig, isFromPDServer bool) { if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { - if isFromAPIServer { + if isFromPDServer { re.Equal(scheduleCfg.LeaderScheduleLimit, leaderServer.GetPersistOptions().GetLeaderScheduleLimit()) re.NotEqual(scheduleCfg.LeaderScheduleLimit, schedulingServer.GetPersistConfig().GetLeaderScheduleLimit()) } else { @@ -397,9 +397,9 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } } - checkReplicateConfig := func(replicationCfg *sc.ReplicationConfig, isFromAPIServer bool) { + checkReplicateConfig := func(replicationCfg *sc.ReplicationConfig, isFromPDServer bool) { if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { - if isFromAPIServer { + if isFromPDServer { re.Equal(replicationCfg.MaxReplicas, uint64(leaderServer.GetPersistOptions().GetMaxReplicas())) re.NotEqual(int(replicationCfg.MaxReplicas), schedulingServer.GetPersistConfig().GetMaxReplicas()) } else { @@ -411,11 +411,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } } - checkRules := func(rules []*placement.Rule, isFromAPIServer bool) { + checkRules := func(rules []*placement.Rule, isFromPDServer bool) { apiRules := leaderServer.GetRaftCluster().GetRuleManager().GetAllRules() if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { schedulingRules := schedulingServer.GetCluster().GetRuleManager().GetAllRules() - if isFromAPIServer { + if isFromPDServer { re.Len(apiRules, len(rules)) re.NotEqual(len(schedulingRules), len(rules)) } else { @@ -427,11 +427,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } } - checkGroup := func(group placement.RuleGroup, isFromAPIServer bool) { + checkGroup := func(group placement.RuleGroup, isFromPDServer bool) { apiGroup := leaderServer.GetRaftCluster().GetRuleManager().GetRuleGroup(placement.DefaultGroupID) if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { schedulingGroup := schedulingServer.GetCluster().GetRuleManager().GetRuleGroup(placement.DefaultGroupID) - if isFromAPIServer { + if isFromPDServer { re.Equal(apiGroup.Index, group.Index) re.NotEqual(schedulingGroup.Index, group.Index) } else { @@ -444,11 +444,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } testConfig := func(options ...string) { - for _, isFromAPIServer := range []bool{true, false} { + for _, isFromPDServer := range []bool{true, false} { cmd := ctl.GetRootCmd() args := []string{"-u", pdAddr, "config", "show"} args = append(args, options...) - if isFromAPIServer { + if isFromPDServer { args = append(args, "--from_api_server") } output, err := tests.ExecuteCommand(cmd, args...) @@ -456,16 +456,16 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu if len(options) == 0 || options[0] == "all" { cfg := config.Config{} re.NoError(json.Unmarshal(output, &cfg)) - checkReplicateConfig(&cfg.Replication, isFromAPIServer) - checkScheduleConfig(&cfg.Schedule, isFromAPIServer) + checkReplicateConfig(&cfg.Replication, isFromPDServer) + checkScheduleConfig(&cfg.Schedule, isFromPDServer) } else if options[0] == "replication" { replicationCfg := &sc.ReplicationConfig{} re.NoError(json.Unmarshal(output, replicationCfg)) - checkReplicateConfig(replicationCfg, isFromAPIServer) + checkReplicateConfig(replicationCfg, isFromPDServer) } else if options[0] == "schedule" { scheduleCfg := &sc.ScheduleConfig{} re.NoError(json.Unmarshal(output, scheduleCfg)) - checkScheduleConfig(scheduleCfg, isFromAPIServer) + checkScheduleConfig(scheduleCfg, isFromPDServer) } else { re.Fail("no implement") } @@ -473,11 +473,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } testRules := func(options ...string) { - for _, isFromAPIServer := range []bool{true, false} { + for _, isFromPDServer := range []bool{true, false} { cmd := ctl.GetRootCmd() args := []string{"-u", pdAddr, "config", "placement-rules"} args = append(args, options...) - if isFromAPIServer { + if isFromPDServer { args = append(args, "--from_api_server") } output, err := tests.ExecuteCommand(cmd, args...) @@ -485,25 +485,25 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu if options[0] == "show" { var rules []*placement.Rule re.NoError(json.Unmarshal(output, &rules)) - checkRules(rules, isFromAPIServer) + checkRules(rules, isFromPDServer) } else if options[0] == "load" { var rules []*placement.Rule b, _ := os.ReadFile(fname) re.NoError(json.Unmarshal(b, &rules)) - checkRules(rules, isFromAPIServer) + checkRules(rules, isFromPDServer) } else if options[0] == "rule-group" { var group placement.RuleGroup re.NoError(json.Unmarshal(output, &group), string(output)) - checkGroup(group, isFromAPIServer) + checkGroup(group, isFromPDServer) } else if options[0] == "rule-bundle" && options[1] == "get" { var bundle placement.GroupBundle re.NoError(json.Unmarshal(output, &bundle), string(output)) - checkRules(bundle.Rules, isFromAPIServer) + checkRules(bundle.Rules, isFromPDServer) } else if options[0] == "rule-bundle" && options[1] == "load" { var bundles []placement.GroupBundle b, _ := os.ReadFile(fname) re.NoError(json.Unmarshal(b, &bundles), string(output)) - checkRules(bundles[0].Rules, isFromAPIServer) + checkRules(bundles[0].Rules, isFromPDServer) } else { re.Fail("no implement") } @@ -522,13 +522,13 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu re.Equal(uint64(233), sche.GetPersistConfig().GetLeaderScheduleLimit()) re.Equal(7, sche.GetPersistConfig().GetMaxReplicas()) } - // show config from api server rather than scheduling server + // show config from PD service rather than scheduling server testConfig() - // show all config from api server rather than scheduling server + // show all config from PD service rather than scheduling server testConfig("all") - // show replication config from api server rather than scheduling server + // show replication config from PD service rather than scheduling server testConfig("replication") - // show schedule config from api server rather than scheduling server + // show schedule config from PD service rather than scheduling server testConfig("schedule") // Test Rule diff --git a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go index fca00f2fd3c..2734348d7de 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go @@ -41,7 +41,7 @@ func TestKeyspaceGroup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := pdTests.NewTestAPICluster(ctx, 1) + tc, err := pdTests.NewTestMSCluster(ctx, 1) re.NoError(err) defer tc.Destroy() err = tc.RunInitialServers() @@ -102,7 +102,7 @@ func TestSplitKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -157,7 +157,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -197,7 +197,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -301,7 +301,7 @@ func TestMergeKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -420,7 +420,7 @@ func TestKeyspaceGroupState(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -511,7 +511,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) diff --git a/tools/pd-ctl/tests/keyspace/keyspace_test.go b/tools/pd-ctl/tests/keyspace/keyspace_test.go index 23a1148cd66..24342b5240e 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_test.go @@ -49,7 +49,7 @@ func TestKeyspace(t *testing.T) { for i := 1; i < 10; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestAPICluster(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestMSCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -155,7 +155,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) - tc, err := pdTests.NewTestAPICluster(suite.ctx, 1) + tc, err := pdTests.NewTestMSCluster(suite.ctx, 1) re.NoError(err) re.NoError(tc.RunInitialServers()) tc.WaitLeader()