diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 4698ede3259ad..6044936acfa62 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -30,7 +30,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/samber/lo" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -398,22 +397,6 @@ func (mr *MilvusRoles) Run() { defer streaming.Release() } - enableComponents := []bool{ - mr.EnableRootCoord, - mr.EnableProxy, - mr.EnableQueryCoord, - mr.EnableQueryNode, - mr.EnableDataCoord, - mr.EnableDataNode, - mr.EnableIndexCoord, - mr.EnableIndexNode, - mr.EnableStreamingNode, - } - enableComponents = lo.Filter(enableComponents, func(v bool, _ int) bool { - return v - }) - healthz.SetComponentNum(len(enableComponents)) - expr.Init() expr.Register("param", paramtable.Get()) mr.setupLogger() @@ -432,73 +415,124 @@ func (mr *MilvusRoles) Run() { } } + data := paramtable.Get().CommonCfg.ManualStartComponents.GetAsStrings() + manualStartComponents := typeutil.NewSet(data...) + log.Info("manual start components: ", zap.Any("components", data)) + var wg sync.WaitGroup local := mr.Local componentMap := make(map[string]component) var rootCoord, queryCoord, indexCoord, dataCoord component var proxy, dataNode, indexNode, queryNode, streamingNode component - if mr.EnableRootCoord { + if mr.EnableRootCoord && !manualStartComponents.Contain(typeutil.RootCoordRole) { rootCoord = mr.runRootCoord(ctx, local, &wg) componentMap[typeutil.RootCoordRole] = rootCoord } - if mr.EnableDataCoord { + if mr.EnableDataCoord && !manualStartComponents.Contain(typeutil.DataCoordRole) { dataCoord = mr.runDataCoord(ctx, local, &wg) componentMap[typeutil.DataCoordRole] = dataCoord } - if mr.EnableIndexCoord { + if mr.EnableIndexCoord && !manualStartComponents.Contain(typeutil.IndexCoordRole) { indexCoord = mr.runIndexCoord(ctx, local, &wg) componentMap[typeutil.IndexCoordRole] = indexCoord } - if mr.EnableQueryCoord { + if mr.EnableQueryCoord && !manualStartComponents.Contain(typeutil.QueryCoordRole) { queryCoord = mr.runQueryCoord(ctx, local, &wg) componentMap[typeutil.QueryCoordRole] = queryCoord } - if mr.EnableQueryNode { + if mr.EnableQueryNode && !manualStartComponents.Contain(typeutil.QueryNodeRole) { queryNode = mr.runQueryNode(ctx, local, &wg) componentMap[typeutil.QueryNodeRole] = queryNode } - if mr.EnableDataNode { + if mr.EnableDataNode && !manualStartComponents.Contain(typeutil.DataNodeRole) { dataNode = mr.runDataNode(ctx, local, &wg) componentMap[typeutil.DataNodeRole] = dataNode } - if mr.EnableIndexNode { + if mr.EnableIndexNode && !manualStartComponents.Contain(typeutil.IndexNodeRole) { indexNode = mr.runIndexNode(ctx, local, &wg) componentMap[typeutil.IndexNodeRole] = indexNode } - if mr.EnableProxy { + if mr.EnableProxy && !manualStartComponents.Contain(typeutil.ProxyRole) { proxy = mr.runProxy(ctx, local, &wg) componentMap[typeutil.ProxyRole] = proxy } - if mr.EnableStreamingNode { + if mr.EnableStreamingNode && !manualStartComponents.Contain(typeutil.StreamingNodeRole) { streamingNode = mr.runStreamingNode(ctx, local, &wg) componentMap[typeutil.StreamingNodeRole] = streamingNode } wg.Wait() + http.RegisterStartComponent(func(role string) error { + if len(role) == 0 { + return fmt.Errorf("start component [%s] in [%s] is not supported", role, mr.ServerType) + } + + if componentMap[role] != nil { + log.Warn(fmt.Sprintf("component[%s] already exist in [%s]", role, mr.ServerType)) + return nil + } + + switch role { + case typeutil.RootCoordRole: + rootCoord = mr.runRootCoord(ctx, local, &wg) + componentMap[typeutil.RootCoordRole] = rootCoord + case typeutil.DataCoordRole: + dataCoord = mr.runDataCoord(ctx, local, &wg) + componentMap[typeutil.DataCoordRole] = dataCoord + case typeutil.QueryCoordRole: + queryCoord = mr.runQueryCoord(ctx, local, &wg) + componentMap[typeutil.QueryCoordRole] = queryCoord + case typeutil.QueryNodeRole: + queryNode = mr.runQueryNode(ctx, local, &wg) + componentMap[typeutil.QueryNodeRole] = queryNode + case typeutil.DataNodeRole: + dataNode = mr.runDataNode(ctx, local, &wg) + componentMap[typeutil.DataNodeRole] = dataNode + case typeutil.IndexNodeRole: + indexNode = mr.runIndexNode(ctx, local, &wg) + componentMap[typeutil.IndexNodeRole] = indexNode + case typeutil.ProxyRole: + proxy = mr.runProxy(ctx, local, &wg) + componentMap[typeutil.ProxyRole] = proxy + default: + return fmt.Errorf("component [%s] in [%s] is not supported", role, mr.ServerType) + } + + return nil + }) + http.RegisterStopComponent(func(role string) error { if len(role) == 0 || componentMap[role] == nil { return fmt.Errorf("stop component [%s] in [%s] is not supported", role, mr.ServerType) } + if componentMap[role] == nil { + return fmt.Errorf("component [%s] in [%s] is not started", role, mr.ServerType) + } + log.Info("unregister component before stop", zap.String("role", role)) healthz.UnRegister(role) return componentMap[role].Stop() }) http.RegisterCheckComponentReady(func(role string) error { - if len(role) == 0 || componentMap[role] == nil { + if len(role) == 0 { return fmt.Errorf("check component state for [%s] in [%s] is not supported", role, mr.ServerType) } + if componentMap[role] == nil { + return fmt.Errorf("component [%s] in [%s] is not started", role, mr.ServerType) + } + // for coord component, if it's in standby state, it will return StateCode_StandBy code := componentMap[role].Health(context.TODO()) if code != commonpb.StateCode_Healthy { diff --git a/internal/http/healthz/healthz_handler.go b/internal/http/healthz/healthz_handler.go index 62c98e1cd83cc..214919d8016b4 100644 --- a/internal/http/healthz/healthz_handler.go +++ b/internal/http/healthz/healthz_handler.go @@ -22,6 +22,7 @@ import ( "net/http" "sync" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -52,12 +53,10 @@ type HealthResponse struct { } type HealthHandler struct { - indicators []Indicator - indicatorNum int + indicators map[string]Indicator // unregister role when call stop by restful api - unregisterLock sync.RWMutex - unregisteredRoles map[string]struct{} + lock sync.RWMutex } var _ http.Handler = (*HealthHandler)(nil) @@ -65,21 +64,33 @@ var _ http.Handler = (*HealthHandler)(nil) var defaultHandler = HealthHandler{} func Register(indicator Indicator) { - defaultHandler.indicators = append(defaultHandler.indicators, indicator) -} + defaultHandler.lock.Lock() + defer defaultHandler.lock.Unlock() -func SetComponentNum(num int) { - defaultHandler.indicatorNum = num + if defaultHandler.indicators == nil { + defaultHandler.indicators = make(map[string]Indicator) + } + defaultHandler.indicators[indicator.GetName()] = indicator + log.Info("register indicator", + zap.String("name", indicator.GetName()), + zap.Int("num", len(defaultHandler.indicators)), + ) } func UnRegister(role string) { - defaultHandler.unregisterLock.Lock() - defer defaultHandler.unregisterLock.Unlock() + defaultHandler.lock.Lock() + defer defaultHandler.lock.Unlock() - if defaultHandler.unregisteredRoles == nil { - defaultHandler.unregisteredRoles = make(map[string]struct{}) + if defaultHandler.indicators == nil || defaultHandler.indicators[role] == nil { + log.Warn("indicator not found", zap.String("name", role)) + return } - defaultHandler.unregisteredRoles[role] = struct{}{} + + delete(defaultHandler.indicators, role) + + log.Info("unregister indicator", + zap.String("name", role), + zap.Int("num", len(defaultHandler.indicators))) } func Handler() *HealthHandler { @@ -90,16 +101,14 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) resp := &HealthResponse{ State: "OK", } + ctx := context.Background() + handler.lock.RLock() + indicators := lo.Values(handler.indicators) + handler.lock.RUnlock() + healthNum := 0 - for _, in := range handler.indicators { - handler.unregisterLock.RLock() - _, unregistered := handler.unregisteredRoles[in.GetName()] - handler.unregisterLock.RUnlock() - if unregistered { - healthNum++ - continue - } + for _, in := range indicators { code := in.Health(ctx) resp.Detail = append(resp.Detail, &IndicatorState{ Name: in.GetName(), @@ -110,8 +119,8 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) } } - if healthNum != handler.indicatorNum { - resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, handler.indicatorNum) + if healthNum != len(handler.indicators) { + resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, len(indicators)) } if resp.State == "OK" { diff --git a/internal/http/router.go b/internal/http/router.go index f457115183868..aa642e781e2d3 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -41,6 +41,7 @@ const ( // for every component, register it's own api to trigger stop and check ready const ( + RouteTriggerStartPath = "/management/start" RouteTriggerStopPath = "/management/stop" RouteCheckComponentReady = "/management/check/ready" RouteWebUI = "/webui/" diff --git a/internal/http/server.go b/internal/http/server.go index f9932f72a73af..45d303ef7d9a4 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -106,6 +106,26 @@ func registerDefaults() { RegisterWebUIHandler() } +func RegisterStartComponent(triggerComponentStart func(role string) error) { + // register restful api to trigger stop + Register(&Handler{ + Path: RouteTriggerStartPath, + HandlerFunc: func(w http.ResponseWriter, req *http.Request) { + role := req.URL.Query().Get("role") + log.Info("start to trigger component start", zap.String("role", role)) + if err := triggerComponentStart(role); err != nil { + log.Warn("failed to trigger component start", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf(`{"msg": "failed to trigger component start, %s"}`, err.Error()))) + return + } + log.Info("finish to trigger component start", zap.String("role", role)) + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"msg": "OK"}`)) + }, + }) +} + func RegisterStopComponent(triggerComponentStop func(role string) error) { // register restful api to trigger stop Register(&Handler{ diff --git a/internal/http/server_test.go b/internal/http/server_test.go index e2d1090c0d42a..5da501ded1bae 100644 --- a/internal/http/server_test.go +++ b/internal/http/server_test.go @@ -103,7 +103,6 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() { url := "http://localhost:" + DefaultListenPort + "/healthz" client := http.Client{} - healthz.SetComponentNum(1) healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy}) req, _ := http.NewRequest(http.MethodGet, url, nil) @@ -121,7 +120,6 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() { body, _ = io.ReadAll(resp.Body) suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body)) - healthz.SetComponentNum(2) healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal}) req, _ = http.NewRequest(http.MethodGet, url, nil) req.Header.Set("Content-Type", "application/json") diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 93b5fb62191b3..ad44e4d6fc3fa 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -283,6 +283,8 @@ type commonConfig struct { ReadOnlyPrivileges ParamItem `refreshable:"false"` ReadWritePrivileges ParamItem `refreshable:"false"` AdminPrivileges ParamItem `refreshable:"false"` + + ManualStartComponents ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -924,6 +926,13 @@ This helps Milvus-CDC synchronize incremental data`, Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`, } p.AdminPrivileges.Init(base.mgr) + + p.ManualStartComponents = ParamItem{ + Key: "common.manualStartComponent", + Doc: `if component has been set to manualStartComponent, it will skip start progress, then can startup by management api, example: "querynode,datanode,indexnode,proxy"`, + Version: "2.4.17", + } + p.ManualStartComponents.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 529ec7a2d969b..84f7a49c358fd 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -134,6 +134,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) + assert.Equal(t, 0, len(Params.ManualStartComponents.GetAsStrings())) }) t.Run("test rootCoordConfig", func(t *testing.T) {