Skip to content

Commit

Permalink
remove api mode concept
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 27, 2024
1 parent 4d8009d commit 3c8d130
Show file tree
Hide file tree
Showing 52 changed files with 260 additions and 270 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
// Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down
2 changes: 1 addition & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// API server will return a JSON body containing the detailed error message
// PD service will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
Expand Down
2 changes: 1 addition & 1 deletion client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
// We are switching from PD service mode to PD service mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/servicediscovery/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ var (
_ TSOEventSource = (*pdServiceDiscovery)(nil)
)

// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based
// pdServiceDiscovery is the service discovery client of PD/PD service which is quorum based
type pdServiceDiscovery struct {
isInitialized bool

Expand Down Expand Up @@ -670,7 +670,7 @@ func (c *pdServiceDiscovery) SetKeyspaceID(keyspaceID uint32) {

// GetKeyspaceGroupID returns the ID of the keyspace group
func (*pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
// PD/API service only supports the default keyspace group
// PD/PD service only supports the default keyspace group
return constants.DefaultKeyspaceGroupID
}

Expand Down
14 changes: 7 additions & 7 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewServiceCommand() *cobra.Command {
}
cmd.AddCommand(NewTSOServiceCommand())
cmd.AddCommand(NewSchedulingServiceCommand())
cmd.AddCommand(NewAPIServiceCommand())
cmd.AddCommand(NewPDServiceCommand())
return cmd
}

Expand Down Expand Up @@ -128,12 +128,12 @@ func NewSchedulingServiceCommand() *cobra.Command {
return cmd
}

// NewAPIServiceCommand returns the API service command.
func NewAPIServiceCommand() *cobra.Command {
// NewPDServiceCommand returns the PD service command.
func NewPDServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: apiMode,
Short: "Run the API service",
Run: createAPIServerWrapper,
Short: "Run the PD service",
Run: createPDServiceWrapper,
}
addFlags(cmd)
return cmd
Expand All @@ -160,7 +160,7 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
}

func createAPIServerWrapper(cmd *cobra.Command, args []string) {
func createPDServiceWrapper(cmd *cobra.Command, args []string) {
start(cmd, args, cmd.CalledAs())
}

Expand Down Expand Up @@ -219,7 +219,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
defer log.Sync()
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.APIServiceMode)
versioninfo.Log(server.PDServiceMode)
} else {
versioninfo.Log(server.PDMode)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful PD service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful API service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful PD service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Cluster struct {
storage storage.Storage
coordinator *schedule.Coordinator
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
pdLeader atomic.Value
running atomic.Bool

// heartbeatRunner is used to process the subtree update task asynchronously.
Expand Down Expand Up @@ -227,7 +227,7 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
client, err := c.getAPIServerLeaderClient()
client, err := c.getPDLeaderClient()
if err != nil {
return 0, err
}
Expand All @@ -241,11 +241,11 @@ func (c *Cluster) AllocID() (uint64, error) {
return resp.GetId(), nil
}

func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) {
cli := c.apiServerLeader.Load()
func (c *Cluster) getPDLeaderClient() (pdpb.PDClient, error) {
cli := c.pdLeader.Load()
if cli == nil {
c.triggerMembershipCheck()
return nil, errors.New("API server leader is not found")
return nil, errors.New("PD leader is not found")
}
return cli.(pdpb.PDClient), nil
}
Expand All @@ -257,10 +257,10 @@ func (c *Cluster) triggerMembershipCheck() {
}
}

// SwitchAPIServerLeader switches the API server leader.
func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
old := c.apiServerLeader.Load()
return c.apiServerLeader.CompareAndSwap(old, new)
// SwitchPDLeader switches the PD server leader.
func (c *Cluster) SwitchPDLeader(new pdpb.PDClient) bool {
old := c.pdLeader.Load()
return c.pdLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD API server,
// storeConfig will be fetched from TiKV by PD service,
// so we just set an empty value here first.
o.storeConfig.Store(&sc.StoreConfig{})
o.ttl = ttl
Expand Down Expand Up @@ -748,11 +748,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {}

// CheckLabelProperty checks if the label property is satisfied.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD API server for any configuration changes.
// Watcher is used to watch the PD service for any configuration changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand Down Expand Up @@ -76,7 +76,7 @@ type persistedConfig struct {
Store sc.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD API server.
// NewWatcher creates a new watcher to watch the config meta change from PD service.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
// TODO: if we need to send the error back to PD service.
log.Error("failed handle region heartbeat", zap.Error(err))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD API server for any meta changes.
// Watcher is used to watch the PD service for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand All @@ -48,7 +48,7 @@ type Watcher struct {
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD API server.
// NewWatcher creates a new watcher to watch the meta change from PD service.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD API server for any Placement Rule changes.
// Watcher is used to watch the PD service for any Placement Rule changes.
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -74,7 +74,7 @@ type Watcher struct {
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
// NewWatcher creates a new watcher to watch the Placement Rule change from PD service.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type Server struct {
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint

// for watching the PD API server meta info updates that are related to the scheduling.
// for watching the PD service meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
Expand Down Expand Up @@ -169,10 +169,10 @@ func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.updateAPIServerMemberLoop()
go s.updatePDServiceMemberLoop()
}

func (s *Server) updateAPIServerMemberLoop() {
func (s *Server) updatePDServiceMemberLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *Server) updateAPIServerMemberLoop() {
// double check
break
}
if s.cluster.SwitchAPIServerLeader(pdpb.NewPDClient(cc)) {
if s.cluster.SwitchPDLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/utils/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ const (

// MicroserviceRootPath is the root path of microservice in etcd.
MicroserviceRootPath = "/ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// PDServiceName is the name of pd server.
PDServiceName = "pd"
// TSOServiceName is the name of tso server.
TSOServiceName = "tso"
// SchedulingServiceName is the name of scheduling server.
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
// for in PD/API service or the tsopb.Participant in the micro services.
// for in PD/PD service or the tsopb.Participant in the micro services.
type ElectionLeader interface {
// GetListenUrls returns the listen urls
GetListenUrls() []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
// TODO: handle the plugin in API service mode.
// TODO: handle the plugin in PD service mode.
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Controller struct {
// and used in the PD leader service mode now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
// which will only be initialized and used in the API service mode now.
// which will only be initialized and used in the PD service mode now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ type KeyspaceGroupManager struct {
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/API service. It's in the format of "/pd/{cluster_id}".
// the PD/PD service. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
// 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the
// storage endpoint.
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsAPIServiceMode() {
if !h.s.IsPDServiceMode() {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -223,7 +223,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
clientUrls = leader.GetClientUrls()
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/API servers.
// Prevent more than one redirection among PD/PD service.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@ func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error {
}

func buildMsg(err error) string {
return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
return fmt.Sprintf("This operation was executed in PD service but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
}
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// Following requests are **not** redirected:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
// Because the writing of all the config of the scheduling service is in the API server,
// Because the writing of all the config of the scheduling service is in the PD service,
// we should not post and delete the scheduler directly in the scheduling service.
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/config/placement-rule",
constant.SchedulingServiceName,
[]string{http.MethodGet}),
// because the writing of all the meta information of the scheduling service is in the API server,
// because the writing of all the meta information of the scheduling service is in the PD service,
// we should not post and delete the scheduler directly in the scheduling service.
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RegisterMicroService(r *gin.RouterGroup) {
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsAPIServiceMode() {
if !svr.IsPDServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return
}
Expand All @@ -65,7 +65,7 @@ func GetMembers(c *gin.Context) {
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsAPIServiceMode() {
if !svr.IsPDServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return
}
Expand Down
Loading

0 comments on commit 3c8d130

Please sign in to comment.