From 7e477528d387cc47c079effb300630875edf8058 Mon Sep 17 00:00:00 2001 From: chyezh Date: Mon, 25 Nov 2024 11:23:30 +0800 Subject: [PATCH 1/2] enhance: add switch for local rpc enabled Signed-off-by: chyezh --- configs/milvus.yaml | 1 + internal/coordinator/coordclient/registry.go | 7 ++++--- internal/coordinator/coordclient/registry_test.go | 4 ++++ pkg/util/paramtable/component_param.go | 12 ++++++++++++ pkg/util/paramtable/component_param_test.go | 4 ++++ 5 files changed, 25 insertions(+), 3 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index a464e676c6b6d..d38c74cf9d114 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -871,6 +871,7 @@ common: usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction + localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode. # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go index f143a29a9f4f0..a79b11912d7b9 100644 --- a/internal/coordinator/coordclient/registry.go +++ b/internal/coordinator/coordclient/registry.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -57,7 +58,7 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) { // RegisterQueryCoordServer register query coord server func RegisterQueryCoordServer(server querypb.QueryCoordServer) { - if !enableLocal.EnableQueryCoord { + if !enableLocal.EnableQueryCoord || !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { return } newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient) @@ -67,7 +68,7 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) { // RegsterDataCoordServer register data coord server func RegisterDataCoordServer(server datapb.DataCoordServer) { - if !enableLocal.EnableDataCoord { + if !enableLocal.EnableDataCoord || !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { return } newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient) @@ -77,7 +78,7 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) { // RegisterRootCoordServer register root coord server func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { - if !enableLocal.EnableRootCoord { + if !enableLocal.EnableRootCoord || !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { return } newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient) diff --git a/internal/coordinator/coordclient/registry_test.go b/internal/coordinator/coordclient/registry_test.go index 8ed97ac3d5004..6752b637cc4f4 100644 --- a/internal/coordinator/coordclient/registry_test.go +++ b/internal/coordinator/coordclient/registry_test.go @@ -9,10 +9,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestRegistry(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().CommonCfg.LocalRPCEnabled.Key, "true") + assert.False(t, enableLocal.EnableQueryCoord) assert.False(t, enableLocal.EnableDataCoord) assert.False(t, enableLocal.EnableRootCoord) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3b1c4878954a1..526f9df100d57 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -283,6 +283,9 @@ type commonConfig struct { ReadOnlyPrivileges ParamItem `refreshable:"false"` ReadWritePrivileges ParamItem `refreshable:"false"` AdminPrivileges ParamItem `refreshable:"false"` + + // Local RPC enabled for milvus internal communication when mix or standalone mode. + LocalRPCEnabled ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -924,6 +927,15 @@ This helps Milvus-CDC synchronize incremental data`, Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`, } p.AdminPrivileges.Init(base.mgr) + + p.LocalRPCEnabled = ParamItem{ + Key: "common.localRPCEnabled", + Version: "2.4.18", + DefaultValue: "false", + Doc: `enable local rpc for internal communication when mix or standalone mode.`, + Export: true, + } + p.LocalRPCEnabled.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 529ec7a2d969b..cfa0df8ae749c 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -134,6 +134,10 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) + + assert.False(t, params.CommonCfg.LocalRPCEnabled.GetAsBool()) + params.Save("common.localRPCEnabled", "true") + assert.True(t, params.CommonCfg.LocalRPCEnabled.GetAsBool()) }) t.Run("test rootCoordConfig", func(t *testing.T) { From 061b057961e3af6a4138beec7f6e7dc7bc62c5b6 Mon Sep 17 00:00:00 2001 From: chyezh Date: Mon, 25 Nov 2024 19:36:03 +0800 Subject: [PATCH 2/2] enhance: add switch for local rpc enabled Signed-off-by: chyezh --- cmd/milvus/util.go | 8 +------- cmd/roles/roles.go | 8 ++++++++ internal/coordinator/coordclient/registry.go | 9 ++++++--- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index c21fe4ec2816b..e7dcb2035343c 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/cmd/roles" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" @@ -180,12 +179,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) os.Exit(-1) } - coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{ - ServerType: serverType, - EnableQueryCoord: role.EnableQueryCoord, - EnableDataCoord: role.EnableDataCoord, - EnableRootCoord: role.EnableRootCoord, - }) + return role } diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 4698ede3259ad..5f355436fc867 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/cmd/components" + "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/http" "github.com/milvus-io/milvus/internal/http/healthz" @@ -398,6 +399,13 @@ func (mr *MilvusRoles) Run() { defer streaming.Release() } + coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{ + ServerType: mr.ServerType, + EnableQueryCoord: mr.EnableQueryCoord, + EnableDataCoord: mr.EnableDataCoord, + EnableRootCoord: mr.EnableRootCoord, + }) + enableComponents := []bool{ mr.EnableRootCoord, mr.EnableProxy, diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go index a79b11912d7b9..8ba4a721cd5ec 100644 --- a/internal/coordinator/coordclient/registry.go +++ b/internal/coordinator/coordclient/registry.go @@ -50,6 +50,9 @@ type LocalClientRoleConfig struct { // EnableLocalClientRole init localable roles func EnableLocalClientRole(cfg *LocalClientRoleConfig) { + if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { + return + } if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole { return } @@ -58,7 +61,7 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) { // RegisterQueryCoordServer register query coord server func RegisterQueryCoordServer(server querypb.QueryCoordServer) { - if !enableLocal.EnableQueryCoord || !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { + if !enableLocal.EnableQueryCoord { return } newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient) @@ -68,7 +71,7 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) { // RegsterDataCoordServer register data coord server func RegisterDataCoordServer(server datapb.DataCoordServer) { - if !enableLocal.EnableDataCoord || !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { + if !enableLocal.EnableDataCoord { return } newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient) @@ -78,7 +81,7 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) { // RegisterRootCoordServer register root coord server func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { - if !enableLocal.EnableRootCoord || !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { + if !enableLocal.EnableRootCoord { return } newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)