Skip to content

Commit

Permalink
feat: support thrift and pb multi service (cloudwego#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina-Sakai authored Feb 1, 2024
1 parent 10ea358 commit 34b88b2
Show file tree
Hide file tree
Showing 44 changed files with 873 additions and 168 deletions.
3 changes: 2 additions & 1 deletion client/service_inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func NewServiceInlineClient(svcInfo *serviceinfo.ServiceInfo, s ServerInitialInf
kc.opt = client.NewOptions(opts)
kc.serverEps = s.Endpoints()
kc.serverOpt = s.Option()
kc.serverOpt.RemoteOpt.SvcMap = s.GetServiceInfos()
kc.serverOpt.RemoteOpt.TargetSvcInfo = svcInfo
kc.serverOpt.RemoteOpt.SvcSearchMap = s.GetServiceInfos()
if err := kc.init(); err != nil {
_ = kc.Close()
return nil, err
Expand Down
25 changes: 24 additions & 1 deletion internal/mocks/serviceinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
const (
MockServiceName = "MockService"
MockService2Name = "MockService2"
MockService3Name = "MockService3"
MockMethod string = "mock"
Mock2Method string = "mock2"
MockExceptionMethod string = "mockException"
Expand Down Expand Up @@ -66,7 +67,7 @@ func newServiceInfo() *serviceinfo.ServiceInfo {
return svcInfo
}

// ServiceInfo return mock serviceInfo
// Service2Info return mock serviceInfo
func Service2Info() *serviceinfo.ServiceInfo {
return myServiceService2Info
}
Expand All @@ -88,6 +89,28 @@ func newService2Info() *serviceinfo.ServiceInfo {
return svcInfo
}

// Service3Info return mock serviceInfo
func Service3Info() *serviceinfo.ServiceInfo {
return myServiceService3Info
}

var myServiceService3Info = newService3Info()

func newService3Info() *serviceinfo.ServiceInfo {
methods := map[string]serviceinfo.MethodInfo{
"mock": serviceinfo.NewMethodInfo(mockHandler, NewMockArgs, NewMockResult, false),
}

svcInfo := &serviceinfo.ServiceInfo{
ServiceName: MockService3Name,
Methods: methods,
Extra: map[string]interface{}{
"PackageName": "mock",
},
}
return svcInfo
}

func mockHandler(ctx context.Context, handler, args, result interface{}) error {
a := args.(*myServiceMockArgs)
r := result.(*myServiceMockResult)
Expand Down
2 changes: 2 additions & 0 deletions internal/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type Options struct {
BackupOpt backup.Options

Streaming stream.StreamingConfig

RefuseTrafficWithoutServiceName bool
}

type Limit struct {
Expand Down
41 changes: 41 additions & 0 deletions internal/server/register_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package server

// RegisterOption is the only way to config service registration.
type RegisterOption struct {
F func(o *RegisterOptions)
}

// RegisterOptions is used to config service registration.
type RegisterOptions struct {
IsFallbackService bool
}

// NewRegisterOptions creates a register options.
func NewRegisterOptions(opts []RegisterOption) *RegisterOptions {
o := &RegisterOptions{}
ApplyRegisterOptions(opts, o)
return o
}

// ApplyRegisterOptions applies the given register options.
func ApplyRegisterOptions(opts []RegisterOption, o *RegisterOptions) {
for _, op := range opts {
op.F(o)
}
}
7 changes: 4 additions & 3 deletions pkg/diagnosis/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func RegisterProbeFunc(svc Service, name ProbeName, pf ProbeFunc) {
// If you want to register other info, please use RegisterProbeFunc(ProbeName, ProbeFunc) to do that.
const (
// Common
ChangeEventsKey ProbeName = "events"
ServiceInfosKey ProbeName = "service_infos"
OptionsKey ProbeName = "options"
ChangeEventsKey ProbeName = "events"
ServiceInfosKey ProbeName = "service_infos"
FallbackServiceKey ProbeName = "fallback_service"
OptionsKey ProbeName = "options"

// Client
DestServiceKey ProbeName = "dest_service"
Expand Down
8 changes: 8 additions & 0 deletions pkg/generic/binarythrift_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ var _ remote.Message = &mockMessage{}
type mockMessage struct {
RPCInfoFunc func() rpcinfo.RPCInfo
ServiceInfoFunc func() *serviceinfo.ServiceInfo
SetServiceInfoFunc func(svcName, methodName string) (*serviceinfo.ServiceInfo, error)
DataFunc func() interface{}
NewDataFunc func(method string) (ok bool)
MessageTypeFunc func() remote.MessageType
Expand Down Expand Up @@ -198,6 +199,13 @@ func (m *mockMessage) ServiceInfo() (si *serviceinfo.ServiceInfo) {
return
}

func (m *mockMessage) SpecifyServiceInfo(svcName, methodName string) (si *serviceinfo.ServiceInfo, err error) {
if m.SetServiceInfoFunc != nil {
return m.SetServiceInfoFunc(svcName, methodName)
}
return nil, nil
}

func (m *mockMessage) Data() interface{} {
if m.DataFunc != nil {
return m.DataFunc()
Expand Down
4 changes: 4 additions & 0 deletions pkg/generic/json_test/generic_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/genericclient"
"github.com/cloudwego/kitex/internal/mocks"
kt "github.com/cloudwego/kitex/internal/mocks/thrift"
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/generic"
Expand Down Expand Up @@ -269,6 +270,9 @@ func newMockServer(handler kt.Mock, addr net.Addr, opts ...server.Option) server
if err := svr.RegisterService(serviceInfo(), handler); err != nil {
panic(err)
}
if err := svr.RegisterService(mocks.ServiceInfo(), mocks.MyServiceHandler()); err != nil {
panic(err)
}
go func() {
err := svr.Run()
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/remote/codec/header_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func fillBasicInfoOfTTHeader(msg remote.Message) {
fi.SetServiceName(v)
}
}
if ink, ok := msg.RPCInfo().Invocation().(rpcinfo.InvocationSetter); ok {
if svcName, ok := msg.TransInfo().TransStrInfo()[transmeta.HeaderIDLServiceName]; ok {
ink.SetServiceName(svcName)
}
}
} else {
ti := remoteinfo.AsRemoteInfo(msg.RPCInfo().To())
if ti != nil {
Expand Down
18 changes: 15 additions & 3 deletions pkg/remote/codec/header_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cloudwego/kitex/pkg/remote/transmeta"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
tm "github.com/cloudwego/kitex/pkg/transmeta"
"github.com/cloudwego/kitex/transport"
)
Expand Down Expand Up @@ -306,8 +307,18 @@ var (
)

func initServerRecvMsg() remote.Message {
var req interface{}
msg := remote.NewMessage(req, mocks.ServiceInfo(), mockSvrRPCInfo, remote.Call, remote.Server)
svcInfo := mocks.ServiceInfo()
svcSearchMap := map[string]*serviceinfo.ServiceInfo{
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockMethod): svcInfo,
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockExceptionMethod): svcInfo,
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockErrorMethod): svcInfo,
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockOnewayMethod): svcInfo,
mocks.MockMethod: svcInfo,
mocks.MockExceptionMethod: svcInfo,
mocks.MockErrorMethod: svcInfo,
mocks.MockOnewayMethod: svcInfo,
}
msg := remote.NewMessageWithNewer(svcInfo, svcSearchMap, mockSvrRPCInfo, remote.Call, remote.Server, false)
return msg
}

Expand Down Expand Up @@ -367,14 +378,15 @@ func prepareIntKVInfo() map[uint16]string {
}

func prepareStrKVInfo() map[string]string {
kvInfo := map[string]string{}
kvInfo := map[string]string{transmeta.HeaderIDLServiceName: mocks.MockServiceName}
return kvInfo
}

func prepareStrKVInfoWithGDPRToken() map[string]string {
kvInfo := map[string]string{
transmeta.GDPRToken: "mockToken",
transmeta.HeaderTransRemoteAddr: "mockRemoteAddr",
transmeta.HeaderIDLServiceName: mocks.MockServiceName,
}
return kvInfo
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/remote/codec/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ func SetOrCheckMethodName(methodName string, message remote.Message) error {
if message.RPCRole() == remote.Client {
return fmt.Errorf("wrong method name, expect=%s, actual=%s", callMethodName, methodName)
}
svcInfo := message.ServiceInfo()
svcInfo, err := message.SpecifyServiceInfo(ink.ServiceName(), methodName)
if err != nil {
return err
}
if ink, ok := ink.(rpcinfo.InvocationSetter); ok {
ink.SetMethodName(methodName)
ink.SetPackageName(svcInfo.GetPackageName())
ink.SetServiceName(svcInfo.ServiceName)
} else {
return errors.New("the interface Invocation doesn't implement InvocationSetter")
}
if mt := svcInfo.MethodInfo(methodName); mt == nil {
return remote.NewTransErrorWithMsg(remote.UnknownMethod, fmt.Sprintf("unknown method %s", methodName))
}

// unknown method doesn't set methodName for RPCInfo.To(), or lead inconsistent with old version
rpcinfo.AsMutableEndpointInfo(ri.To()).SetMethod(methodName)
Expand Down
20 changes: 18 additions & 2 deletions pkg/remote/codec/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,34 @@ import (
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

func TestSetOrCheckMethodName(t *testing.T) {
var req interface{}
ri := rpcinfo.NewRPCInfo(nil, rpcinfo.NewEndpointInfo("", "mock", nil, nil),
rpcinfo.NewServerInvocation(), rpcinfo.NewRPCConfig(), rpcinfo.NewRPCStats())
msg := remote.NewMessage(req, mocks.ServiceInfo(), ri, remote.Call, remote.Server)
svcInfo := mocks.ServiceInfo()
svcSearchMap := map[string]*serviceinfo.ServiceInfo{
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockMethod): svcInfo,
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockExceptionMethod): svcInfo,
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockErrorMethod): svcInfo,
remote.BuildMultiServiceKey(mocks.MockServiceName, mocks.MockOnewayMethod): svcInfo,
mocks.MockMethod: svcInfo,
mocks.MockExceptionMethod: svcInfo,
mocks.MockErrorMethod: svcInfo,
mocks.MockOnewayMethod: svcInfo,
}
msg := remote.NewMessageWithNewer(svcInfo, svcSearchMap, ri, remote.Call, remote.Server, false)
err := SetOrCheckMethodName("mock", msg)
test.Assert(t, err == nil)
ri = msg.RPCInfo()
test.Assert(t, ri.Invocation().ServiceName() == mocks.MockServiceName)
test.Assert(t, ri.Invocation().PackageName() == "mock")
test.Assert(t, ri.Invocation().MethodName() == "mock")
test.Assert(t, ri.To().Method() == "mock")

msg = remote.NewMessageWithNewer(svcInfo, map[string]*serviceinfo.ServiceInfo{}, ri, remote.Call, remote.Server, false)
err = SetOrCheckMethodName("dummy", msg)
test.Assert(t, err != nil)
test.Assert(t, err.Error() == "unknown method dummy")
}
Loading

0 comments on commit 34b88b2

Please sign in to comment.