Skip to content

Commit

Permalink
enhance: Enable manual start component in standalone
Browse files Browse the repository at this point in the history
Enable param 'common.standalone.manualStartComponent' to config roles,
which shoule be blocked to run during start a standalone service. then
user can call management api `/management/start` to manual start a role
in standalone service.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Nov 25, 2024
1 parent fbb68ca commit f343af4
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 52 deletions.
88 changes: 61 additions & 27 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -398,22 +397,6 @@ func (mr *MilvusRoles) Run() {
defer streaming.Release()
}

enableComponents := []bool{
mr.EnableRootCoord,
mr.EnableProxy,
mr.EnableQueryCoord,
mr.EnableQueryNode,
mr.EnableDataCoord,
mr.EnableDataNode,
mr.EnableIndexCoord,
mr.EnableIndexNode,
mr.EnableStreamingNode,
}
enableComponents = lo.Filter(enableComponents, func(v bool, _ int) bool {
return v
})
healthz.SetComponentNum(len(enableComponents))

expr.Init()
expr.Register("param", paramtable.Get())
mr.setupLogger()
Expand All @@ -432,73 +415,124 @@ func (mr *MilvusRoles) Run() {
}
}

data := paramtable.Get().CommonCfg.ManualStartComponents.GetAsStrings()
manualStartComponents := typeutil.NewSet(data...)
log.Info("manual start components: ", zap.Any("components", data))

var wg sync.WaitGroup
local := mr.Local

componentMap := make(map[string]component)
var rootCoord, queryCoord, indexCoord, dataCoord component
var proxy, dataNode, indexNode, queryNode, streamingNode component
if mr.EnableRootCoord {
if mr.EnableRootCoord && !manualStartComponents.Contain(typeutil.RootCoordRole) {
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
}

if mr.EnableDataCoord {
if mr.EnableDataCoord && !manualStartComponents.Contain(typeutil.DataCoordRole) {
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
}

if mr.EnableIndexCoord {
if mr.EnableIndexCoord && !manualStartComponents.Contain(typeutil.IndexCoordRole) {
indexCoord = mr.runIndexCoord(ctx, local, &wg)
componentMap[typeutil.IndexCoordRole] = indexCoord
}

if mr.EnableQueryCoord {
if mr.EnableQueryCoord && !manualStartComponents.Contain(typeutil.QueryCoordRole) {
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
}

if mr.EnableQueryNode {
if mr.EnableQueryNode && !manualStartComponents.Contain(typeutil.QueryNodeRole) {
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
}

if mr.EnableDataNode {
if mr.EnableDataNode && !manualStartComponents.Contain(typeutil.DataNodeRole) {
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
}
if mr.EnableIndexNode {
if mr.EnableIndexNode && !manualStartComponents.Contain(typeutil.IndexNodeRole) {
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
}

if mr.EnableProxy {
if mr.EnableProxy && !manualStartComponents.Contain(typeutil.ProxyRole) {
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
}

if mr.EnableStreamingNode {
if mr.EnableStreamingNode && !manualStartComponents.Contain(typeutil.StreamingNodeRole) {
streamingNode = mr.runStreamingNode(ctx, local, &wg)
componentMap[typeutil.StreamingNodeRole] = streamingNode
}

wg.Wait()

http.RegisterStartComponent(func(role string) error {
if len(role) == 0 {
return fmt.Errorf("start component [%s] in [%s] is not supported", role, mr.ServerType)
}

if componentMap[role] != nil {
log.Warn(fmt.Sprintf("component[%s] already exist in [%s]", role, mr.ServerType))
return nil
}

switch role {
case typeutil.RootCoordRole:
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
case typeutil.DataCoordRole:
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
case typeutil.QueryCoordRole:
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
case typeutil.QueryNodeRole:
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
case typeutil.DataNodeRole:
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
case typeutil.IndexNodeRole:
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
case typeutil.ProxyRole:
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
default:
return fmt.Errorf("component [%s] in [%s] is not supported", role, mr.ServerType)
}

return nil
})

http.RegisterStopComponent(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
return fmt.Errorf("stop component [%s] in [%s] is not supported", role, mr.ServerType)
}

if componentMap[role] == nil {
return fmt.Errorf("component [%s] in [%s] is not started", role, mr.ServerType)
}

log.Info("unregister component before stop", zap.String("role", role))
healthz.UnRegister(role)
return componentMap[role].Stop()
})

http.RegisterCheckComponentReady(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
if len(role) == 0 {
return fmt.Errorf("check component state for [%s] in [%s] is not supported", role, mr.ServerType)
}

if componentMap[role] == nil {
return fmt.Errorf("component [%s] in [%s] is not started", role, mr.ServerType)
}

// for coord component, if it's in standby state, it will return StateCode_StandBy
code := componentMap[role].Health(context.TODO())
if code != commonpb.StateCode_Healthy {
Expand Down
55 changes: 32 additions & 23 deletions internal/http/healthz/healthz_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"sync"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -52,34 +53,44 @@ type HealthResponse struct {
}

type HealthHandler struct {
indicators []Indicator
indicatorNum int
indicators map[string]Indicator

// unregister role when call stop by restful api
unregisterLock sync.RWMutex
unregisteredRoles map[string]struct{}
lock sync.RWMutex
}

var _ http.Handler = (*HealthHandler)(nil)

var defaultHandler = HealthHandler{}

func Register(indicator Indicator) {
defaultHandler.indicators = append(defaultHandler.indicators, indicator)
}
defaultHandler.lock.Lock()
defer defaultHandler.lock.Unlock()

func SetComponentNum(num int) {
defaultHandler.indicatorNum = num
if defaultHandler.indicators == nil {
defaultHandler.indicators = make(map[string]Indicator)
}
defaultHandler.indicators[indicator.GetName()] = indicator
log.Info("register indicator",
zap.String("name", indicator.GetName()),
zap.Int("num", len(defaultHandler.indicators)),
)
}

func UnRegister(role string) {
defaultHandler.unregisterLock.Lock()
defer defaultHandler.unregisterLock.Unlock()
defaultHandler.lock.Lock()
defer defaultHandler.lock.Unlock()

Check warning on line 82 in internal/http/healthz/healthz_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/http/healthz/healthz_handler.go#L81-L82

Added lines #L81 - L82 were not covered by tests

if defaultHandler.unregisteredRoles == nil {
defaultHandler.unregisteredRoles = make(map[string]struct{})
if defaultHandler.indicators == nil || defaultHandler.indicators[role] == nil {
log.Warn("indicator not found", zap.String("name", role))
return

Check warning on line 86 in internal/http/healthz/healthz_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/http/healthz/healthz_handler.go#L84-L86

Added lines #L84 - L86 were not covered by tests
}
defaultHandler.unregisteredRoles[role] = struct{}{}

delete(defaultHandler.indicators, role)

log.Info("unregister indicator",
zap.String("name", role),
zap.Int("num", len(defaultHandler.indicators)))

Check warning on line 93 in internal/http/healthz/healthz_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/http/healthz/healthz_handler.go#L89-L93

Added lines #L89 - L93 were not covered by tests
}

func Handler() *HealthHandler {
Expand All @@ -90,16 +101,14 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
resp := &HealthResponse{
State: "OK",
}

ctx := context.Background()
handler.lock.RLock()
indicators := lo.Values(handler.indicators)
handler.lock.RUnlock()

healthNum := 0
for _, in := range handler.indicators {
handler.unregisterLock.RLock()
_, unregistered := handler.unregisteredRoles[in.GetName()]
handler.unregisterLock.RUnlock()
if unregistered {
healthNum++
continue
}
for _, in := range indicators {
code := in.Health(ctx)
resp.Detail = append(resp.Detail, &IndicatorState{
Name: in.GetName(),
Expand All @@ -110,8 +119,8 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
}
}

if healthNum != handler.indicatorNum {
resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, handler.indicatorNum)
if healthNum != len(handler.indicators) {
resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, len(indicators))
}

if resp.State == "OK" {
Expand Down
1 change: 1 addition & 0 deletions internal/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (

// for every component, register it's own api to trigger stop and check ready
const (
RouteTriggerStartPath = "/management/start"
RouteTriggerStopPath = "/management/stop"
RouteCheckComponentReady = "/management/check/ready"
RouteWebUI = "/webui/"
Expand Down
20 changes: 20 additions & 0 deletions internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ func registerDefaults() {
RegisterWebUIHandler()
}

func RegisterStartComponent(triggerComponentStart func(role string) error) {
// register restful api to trigger stop
Register(&Handler{
Path: RouteTriggerStartPath,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
role := req.URL.Query().Get("role")
log.Info("start to trigger component start", zap.String("role", role))
if err := triggerComponentStart(role); err != nil {
log.Warn("failed to trigger component start", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to trigger component start, %s"}`, err.Error())))
return
}
log.Info("finish to trigger component start", zap.String("role", role))
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))

Check warning on line 124 in internal/http/server.go

View check run for this annotation

Codecov / codecov/patch

internal/http/server.go#L109-L124

Added lines #L109 - L124 were not covered by tests
},
})
}

func RegisterStopComponent(triggerComponentStop func(role string) error) {
// register restful api to trigger stop
Register(&Handler{
Expand Down
2 changes: 0 additions & 2 deletions internal/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
url := "http://localhost:" + DefaultListenPort + "/healthz"
client := http.Client{}

healthz.SetComponentNum(1)
healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy})

req, _ := http.NewRequest(http.MethodGet, url, nil)
Expand All @@ -121,7 +120,6 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
body, _ = io.ReadAll(resp.Body)
suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body))

healthz.SetComponentNum(2)
healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal})
req, _ = http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Content-Type", "application/json")
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type commonConfig struct {
ReadOnlyPrivileges ParamItem `refreshable:"false"`
ReadWritePrivileges ParamItem `refreshable:"false"`
AdminPrivileges ParamItem `refreshable:"false"`

ManualStartComponents ParamItem `refreshable:"false"`
}

func (p *commonConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -924,6 +926,13 @@ This helps Milvus-CDC synchronize incremental data`,
Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`,
}
p.AdminPrivileges.Init(base.mgr)

p.ManualStartComponents = ParamItem{
Key: "common.manualStartComponent",
Doc: `if component has been set to manualStartComponent, it will skip start progress, then can startup by management api, example: "querynode,datanode,indexnode,proxy"`,
Version: "2.4.17",
}
p.ManualStartComponents.Init(base.mgr)
}

type gpuConfig struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.ManualStartComponents.GetAsStrings()))
})

t.Run("test rootCoordConfig", func(t *testing.T) {
Expand Down

0 comments on commit f343af4

Please sign in to comment.