Skip to content

Commit

Permalink
enhance: Enable manual start component in standalone
Browse files Browse the repository at this point in the history
Enable param 'common.standalone.manualStartComponent' to config roles,
which shoule be blocked to run during start a standalone service. then
user can call management api `/management/start` to manual start a role
in standalone service.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Nov 21, 2024
1 parent b983ef9 commit ecda0f7
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 52 deletions.
88 changes: 61 additions & 27 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -393,22 +392,6 @@ func (mr *MilvusRoles) Run() {
defer streaming.Release()
}

enableComponents := []bool{
mr.EnableRootCoord,
mr.EnableProxy,
mr.EnableQueryCoord,
mr.EnableQueryNode,
mr.EnableDataCoord,
mr.EnableDataNode,
mr.EnableIndexCoord,
mr.EnableIndexNode,
mr.EnableStreamingNode,
}
enableComponents = lo.Filter(enableComponents, func(v bool, _ int) bool {
return v
})
healthz.SetComponentNum(len(enableComponents))

expr.Init()
expr.Register("param", paramtable.Get())
mr.setupLogger()
Expand All @@ -427,73 +410,124 @@ func (mr *MilvusRoles) Run() {
}
}

// todo: read config from param table
data := paramtable.Get().CommonCfg.ManualStartComponentInStandalone.GetAsStrings()
manualStartComponents := typeutil.NewSet(data...)

var wg sync.WaitGroup
local := mr.Local

componentMap := make(map[string]component)
var rootCoord, queryCoord, indexCoord, dataCoord component
var proxy, dataNode, indexNode, queryNode, streamingNode component
if mr.EnableRootCoord {
if mr.EnableRootCoord && mr.Local && !manualStartComponents.Contain(typeutil.RootCoordRole) {
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
}

if mr.EnableDataCoord {
if mr.EnableDataCoord && mr.Local && !manualStartComponents.Contain(typeutil.DataCoordRole) {
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
}

if mr.EnableIndexCoord {
if mr.EnableIndexCoord && mr.Local && !manualStartComponents.Contain(typeutil.IndexCoordRole) {
indexCoord = mr.runIndexCoord(ctx, local, &wg)
componentMap[typeutil.IndexCoordRole] = indexCoord
}

if mr.EnableQueryCoord {
if mr.EnableQueryCoord && mr.Local && !manualStartComponents.Contain(typeutil.QueryCoordRole) {
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
}

if mr.EnableQueryNode {
if mr.EnableQueryNode && mr.Local && !manualStartComponents.Contain(typeutil.QueryNodeRole) {
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
}

if mr.EnableDataNode {
if mr.EnableDataNode && mr.Local && !manualStartComponents.Contain(typeutil.DataNodeRole) {
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
}
if mr.EnableIndexNode {
if mr.EnableIndexNode && mr.Local && !manualStartComponents.Contain(typeutil.IndexNodeRole) {
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
}

if mr.EnableProxy {
if mr.EnableProxy && mr.Local && !manualStartComponents.Contain(typeutil.ProxyRole) {
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
}

if mr.EnableStreamingNode {
if mr.EnableStreamingNode && mr.Local && !manualStartComponents.Contain(typeutil.StreamingNodeRole) {
streamingNode = mr.runStreamingNode(ctx, local, &wg)
componentMap[typeutil.StreamingNodeRole] = streamingNode
}

wg.Wait()

http.RegisterStartComponent(func(role string) error {
if len(role) == 0 {
return fmt.Errorf("start component [%s] in [%s] is not supported", role, mr.ServerType)
}

if componentMap[role] != nil {
log.Warn(fmt.Sprintf("component[%s] already exist in [%s]", role, mr.ServerType))
return nil
}

switch role {
case typeutil.RootCoordRole:
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
case typeutil.DataCoordRole:
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
case typeutil.QueryCoordRole:
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
case typeutil.QueryNodeRole:
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
case typeutil.DataNodeRole:
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
case typeutil.IndexNodeRole:
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
case typeutil.ProxyRole:
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
default:
return fmt.Errorf("component [%s] in [%s] is not supported", role, mr.ServerType)
}

return nil
})

http.RegisterStopComponent(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
return fmt.Errorf("stop component [%s] in [%s] is not supported", role, mr.ServerType)
}

if componentMap[role] == nil {
return fmt.Errorf("component [%s] in [%s] is not started", role, mr.ServerType)
}

log.Info("unregister component before stop", zap.String("role", role))
healthz.UnRegister(role)
return componentMap[role].Stop()
})

http.RegisterCheckComponentReady(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
if len(role) == 0 {
return fmt.Errorf("check component state for [%s] in [%s] is not supported", role, mr.ServerType)
}

if componentMap[role] == nil {
return fmt.Errorf("component [%s] in [%s] is not started", role, mr.ServerType)
}

// for coord component, if it's in standby state, it will return StateCode_StandBy
code := componentMap[role].Health(context.TODO())
if code != commonpb.StateCode_Healthy {
Expand Down
55 changes: 32 additions & 23 deletions internal/http/healthz/healthz_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"sync"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -52,34 +53,44 @@ type HealthResponse struct {
}

type HealthHandler struct {
indicators []Indicator
indicatorNum int
indicators map[string]Indicator

// unregister role when call stop by restful api
unregisterLock sync.RWMutex
unregisteredRoles map[string]struct{}
lock sync.RWMutex
}

var _ http.Handler = (*HealthHandler)(nil)

var defaultHandler = HealthHandler{}

func Register(indicator Indicator) {
defaultHandler.indicators = append(defaultHandler.indicators, indicator)
}
defaultHandler.lock.Lock()
defer defaultHandler.lock.Unlock()

func SetComponentNum(num int) {
defaultHandler.indicatorNum = num
if defaultHandler.indicators == nil {
defaultHandler.indicators = make(map[string]Indicator)
}
defaultHandler.indicators[indicator.GetName()] = indicator
log.Info("register indicator",
zap.String("name", indicator.GetName()),
zap.Int("num", len(defaultHandler.indicators)),
)
}

func UnRegister(role string) {
defaultHandler.unregisterLock.Lock()
defer defaultHandler.unregisterLock.Unlock()
defaultHandler.lock.Lock()
defer defaultHandler.lock.Unlock()

Check warning on line 82 in internal/http/healthz/healthz_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/http/healthz/healthz_handler.go#L81-L82

Added lines #L81 - L82 were not covered by tests

if defaultHandler.unregisteredRoles == nil {
defaultHandler.unregisteredRoles = make(map[string]struct{})
if defaultHandler.indicators == nil || defaultHandler.indicators[role] == nil {
log.Warn("indicator not found", zap.String("name", role))
return

Check warning on line 86 in internal/http/healthz/healthz_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/http/healthz/healthz_handler.go#L84-L86

Added lines #L84 - L86 were not covered by tests
}
defaultHandler.unregisteredRoles[role] = struct{}{}

delete(defaultHandler.indicators, role)

log.Info("unregister indicator",
zap.String("name", role),
zap.Int("num", len(defaultHandler.indicators)))

Check warning on line 93 in internal/http/healthz/healthz_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/http/healthz/healthz_handler.go#L89-L93

Added lines #L89 - L93 were not covered by tests
}

func Handler() *HealthHandler {
Expand All @@ -90,16 +101,14 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
resp := &HealthResponse{
State: "OK",
}

ctx := context.Background()
handler.lock.RLock()
indicators := lo.Values(handler.indicators)
handler.lock.RUnlock()

healthNum := 0
for _, in := range handler.indicators {
handler.unregisterLock.RLock()
_, unregistered := handler.unregisteredRoles[in.GetName()]
handler.unregisterLock.RUnlock()
if unregistered {
healthNum++
continue
}
for _, in := range indicators {
code := in.Health(ctx)
resp.Detail = append(resp.Detail, &IndicatorState{
Name: in.GetName(),
Expand All @@ -110,8 +119,8 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
}
}

if healthNum != handler.indicatorNum {
resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, handler.indicatorNum)
if healthNum != len(handler.indicators) {
resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, len(indicators))
}

if resp.State == "OK" {
Expand Down
1 change: 1 addition & 0 deletions internal/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (

// for every component, register it's own api to trigger stop and check ready
const (
RouteTriggerStartPath = "/management/start"
RouteTriggerStopPath = "/management/stop"
RouteCheckComponentReady = "/management/check/ready"
RouteWebUI = "/webui/"
Expand Down
20 changes: 20 additions & 0 deletions internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ func registerDefaults() {
RegisterWebUIHandler()
}

func RegisterStartComponent(triggerComponentStart func(role string) error) {
// register restful api to trigger stop
Register(&Handler{
Path: RouteTriggerStartPath,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
role := req.URL.Query().Get("role")
log.Info("start to trigger component start", zap.String("role", role))
if err := triggerComponentStart(role); err != nil {
log.Warn("failed to trigger component start", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to trigger component start, %s"}`, err.Error())))
return
}
log.Info("finish to trigger component start", zap.String("role", role))
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))

Check warning on line 124 in internal/http/server.go

View check run for this annotation

Codecov / codecov/patch

internal/http/server.go#L109-L124

Added lines #L109 - L124 were not covered by tests
},
})
}

func RegisterStopComponent(triggerComponentStop func(role string) error) {
// register restful api to trigger stop
Register(&Handler{
Expand Down
2 changes: 0 additions & 2 deletions internal/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
url := "http://localhost:" + DefaultListenPort + "/healthz"
client := http.Client{}

healthz.SetComponentNum(1)
healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy})

req, _ := http.NewRequest(http.MethodGet, url, nil)
Expand All @@ -121,7 +120,6 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
body, _ = io.ReadAll(resp.Body)
suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body))

healthz.SetComponentNum(2)
healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal})
req, _ = http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Content-Type", "application/json")
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type commonConfig struct {
ReadOnlyPrivileges ParamItem `refreshable:"false"`
ReadWritePrivileges ParamItem `refreshable:"false"`
AdminPrivileges ParamItem `refreshable:"false"`

ManualStartComponentInStandalone ParamItem `refreshable:"false"`
}

func (p *commonConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -924,6 +926,13 @@ This helps Milvus-CDC synchronize incremental data`,
Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`,
}
p.AdminPrivileges.Init(base.mgr)

p.ManualStartComponentInStandalone = ParamItem{
Key: "common.standalone.manualStartComponent",
Doc: `use to override the default value of manualStartComponent, example: "querynode,datanode,indexnode,proxy"`,
Version: "2.4.17",
}
p.ManualStartComponentInStandalone.Init(base.mgr)
}

type gpuConfig struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.ManualStartComponentInStandalone.GetAsStrings()))
})

t.Run("test rootCoordConfig", func(t *testing.T) {
Expand Down

0 comments on commit ecda0f7

Please sign in to comment.