Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(codec): Unknown Service Handler (#1321) #1498

Closed
wants to merge 4 commits into from

Conversation

lokistars
Copy link

@lokistars lokistars commented Aug 19, 2024

What type of PR is this?

feat: Unknown Service Handle new feature

Check the PR title.

we UnknownServiceHandler mechanism for a normal Kitex server to deal with requests with unknown service.

  • This PR title match the format: <type>(optional scope): <description>
  • The description of this PR title is user-oriented and clear enough for others to understand.
  • Attach the PR updating the user documentation if the current PR requires user awareness at the usage level. User docs repo

(Optional) Translate the PR title into Chinese.

我们为普通的Kitex服务器提供了UnknownServiceHandler机制来处理带有未知服务和方法的请求。

(Optional) More detailed description for this PR(en: English/zh: Chinese).

en:
zh(optional):

unknown Service方案

1. 注入Handler

  1. Unknow Service 该实现放在pkg/unknownservice包下。
  2. 当用户通过WithUnknownServiceHandler注入 UnknownServiceHandler功能,进行处理 Unknown Service和Unknown Method 的功能处理, 目前只支持Thrift和Protobuf协议。
  3. 通过remote.PutPayloadCode形式进行重新包装Thrift和Protobuf,所有的请求都优先经过UnknownServiceCodec中处理。
// WithUnknownServiceHandler Inject an implementation of a method for handling unknown requests
// supporting only Thrift and Kitex protobuf protocols
func WithUnknownServiceHandler(f unknown.UnknownServiceHandler) Option {
    return Option{F: func(o *internal_server.Options, di *utils.Slice) {
       di.Push(fmt.Sprintf("WithUnknownServiceHandler(%+v)", utils.GetFuncName(f)))
       o.RemoteOpt.UnknownServiceHandler = f
       remote.PutPayloadCode(serviceinfo.Thrift, unknownservice.NewUnknownServiceCodec(thrift.NewThriftCodec()))
       remote.PutPayloadCode(serviceinfo.Protobuf, 
       unknownservice.NewUnknownServiceCodec(protobuf.NewProtobufCodec()))
    }}
}
  1. 在服务启动过程中会通过registerUnknownServiceHandler 基于多service 注入一个ServiceInfo。
  2. service name为$UnknownService, method name为$UnknownMethod, Args 和 Result 参数都为 []byte 格式
func (s *server) registerUnknownServiceHandler() {
    if s.opt.RemoteOpt.UnknownServiceHandler != nil {
       if len(s.svcs.svcMap) == 1 && s.svcs.svcMap[serviceinfo.GenericService] != nil {
          panic(errors.New("generic services do not support handling of unknown methods"))
       } else {
          serviceInfo := unknownservice.NewServiceInfo(serviceinfo.Thrift, unknownservice.UnknownService, unknownservice.UnknownMethod)
          if err := s.RegisterService(serviceInfo, s.opt.RemoteOpt.UnknownServiceHandler); err != nil {
             panic(err)
          }
       }
    }
}

2. 处理请求

  1. UnknownServiceCodec 实现了接口remote.PayloadCodec,
type unknownCodec struct {
    Codec remote.PayloadCodec
}
func (c unknownCodec) Marshal(ctx context.Context, msg remote.Message, out remote.ByteBuffer) error {}

func (c unknownCodec) Unmarshal(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {}

func (c unknownCodec) Name() string { return "unknownServiceCodec"}
  1. 当客户端发送请求到服务端,都会优先经过unknownCodec 的 Unmarshal, 通过Peek形式解析到MsgType、Service Name 、Method Name, 且check msgType 是否是 Exception
func (c unknownCodec) Unmarshal(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {
    magicAndMsgType, err := codec.PeekUint32(in)
    if err != nil {
        return err
    }
    msgType := magicAndMsgType & codec.FrontMask
    if msgType == uint32(remote.Exception) {
        return c.Codec.Unmarshal(ctx, message, in)
    }
    if err = codec.UpdateMsgType(msgType, message); err != nil {
        return err
    }
    service, method, err := readDecode(message, in)
    if err != nil {
        return err
    }
    err = codec.SetOrCheckMethodName(method, message)
    ...
}

func readDecode(message remote.Message, in remote.ByteBuffer) (string, string, error) {
    code := message.ProtocolInfo().CodecType
    if code == serviceinfo.Thrift || code == serviceinfo.Protobuf {
       method, size, err := peekMethod(in)
       ....
       return message.RPCInfo().Invocation().ServiceName(), method, nil
    }
    return "", "", nil
}

func peekMethod(in remote.ByteBuffer) (string, int32, error) {
    buf, err := in.Peek(8)
    if err != nil {
       return "", 0, err
    }
    buf = buf[4:]
    size := int32(binary.BigEndian.Uint32(buf))
    buf, err = in.Peek(int(size + 8))
    if err != nil {
       return "", 0, perrors.NewProtocolError(err)
    }
    buf = buf[8:]
    method := string(buf)
    return method, size + 8, nil
}
  1. 通过codec.SetOrCheckMethodName 检测service name和 method name , 校验返回异常类型是否remote.UnknownMethod和remote.UnknownService
  2. 如果不是这两种异常类型,则直接调用Codec.Unmarshal进行后续处理。
err = codec.SetOrCheckMethodName(method, message)
var te *remote.TransError
if errors.As(err, &te) && (te.TypeID() == remote.UnknownMethod || te.TypeID() == remote.UnknownService) {
    svcInfo, err := message.SpecifyServiceInfo(service2.UnknownService, service2.UnknownMethod)
    if err != nil {
       return err
    }
    ...
 }
 return c.Codec.Unmarshal(ctx, message, in)
  1. 如果正常捕获到两种UnknownMethod和UnknownService异常,则重新包装rpcinfo.InvocationSetter信息
if ink, ok := ink.(rpcinfo.InvocationSetter); ok {
    ink.SetMethodName(service2.UnknownMethod)
    ink.SetPackageName(svcInfo.GetPackageName())
    ink.SetServiceName(service2.UnknownService)
} else {
    return errors.New("the interface Invocation doesn't implement InvocationSetter")
}
  1. 封装好请求信息后调用用户注入的UnknownServiceHandler接口实现类, 该接口会给用户传递 Context, serviceName, method
    string, request []byte 信息,返回一个 Result []byte
func callHandler(ctx context.Context, handler, arg, result interface{}) error {
    realArg := arg.(*Args)
    realResult := result.(*Result)
    realResult.Method = realArg.Method
    realResult.ServiceName = realArg.ServiceName
    success, err := handler.(UnknownServiceHandler).UnknownServiceHandler(ctx, realArg.ServiceName, realArg.Method, realArg.Request)
    if err != nil {
       return err
    }
    realResult.Success = success
    return nil
}

3. 处理响应

  1. 响应一个Result []byte 用户需要自行处理客户端接收参数,需要保持一致
  2. 在Marshal中会校验响应对象是否Unknow的Result, 如果不是该对象会调用Codec.Marshal进行处理,如果不是UnknownService的功能不要使用Unknow Result 对象
func (c unknownCodec) Marshal(ctx context.Context, msg remote.Message, out remote.ByteBuffer) error {
    ink := msg.RPCInfo().Invocation()
    data := msg.Data()

    res, ok := data.(*service2.Result)
    if !ok {
       return c.Codec.Marshal(ctx, msg, out)
    }
 }
  1. 如果是Unknow的Result对象,进行还原rpcinfo中信息,重新封装响应给客户端
if ink, ok := ink.(rpcinfo.InvocationSetter); ok {
    ink.SetMethodName(res.Method)
    ink.SetServiceName(res.ServiceName)
} else {
    return errors.New("the interface Invocation doesn't implement InvocationSetter")
}
  1. 根据Thrift和Protobuf不同的协议吧 UnknownServiceHandler中响应的 [] byte信息 封装到remote.ByteBuffer中
func encode(res *service2.Result, msg remote.Message, out remote.ByteBuffer) error {
    if msg.ProtocolInfo().CodecType == serviceinfo.Thrift {
       return encodeThrift(res, msg, out)
    }
    if msg.ProtocolInfo().CodecType == serviceinfo.Protobuf {
       return encodeKitexProtobuf(res, msg, out)
    }
    return nil
}

4. 如何使用

  • server.UnknownServiceHandler接收一个UnknownServiceHandler类型接口, 只有一个实现方法,UnknownServiceHandler
  • UnknownServiceHandler(ctx context.Context, serviceName, method string, request []byte) ([]byte, error)
  • 接收一个Context, serviceName, method 和请求体。
  • 响应一个 []byte, 需要用户自己包装客户端对应的参数信息。
    type UnknownServiceImpl struct{}
// 实现UnknownServiceHandler接口,用来处理未知的请求
func (g *UnknownServiceImpl) UnknownServiceHandler(ctx context.Context, serviceName, method string, request []byte) (response []byte, err error) {
    m := request.([]byte)
    fmt.Printf("Recv: %v\n", m)
    req := "{\"Msg\": \"world\"}"
    return byte(req), nil
}

func NewServer(){
    var opts []server.Option
    // 把UnknownServiceHandler 添加到service中进行处理
    opts = append(opts, server.UnknownServiceHandler(new(UnknownServiceImpl)))
    svr := echo.NewServer(new(EchoImpl), opts...)
    if err := svr.Run(); err != nil {
        log.Fatal(err)
    }
}

(Optional) Which issue(s) this PR fixes:

Fixes #1321

(optional) The PR that updates user documentation:

@lokistars lokistars requested review from a team as code owners August 19, 2024 11:28
@@ -21,6 +21,8 @@ import (
"net"
"time"

"github.com/cloudwego/kitex/pkg/unknownservice/service"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove useless empty line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

ServiceName string
}

type UnknownServiceHandler interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definition can put in unknowservice package directly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import cycle not allowed

@@ -0,0 +1,85 @@
/*
* Copyright 2021 CloudWeGo Authors
Copy link
Member

@YangruiEmma YangruiEmma Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright 2024

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay


"github.com/cloudwego/kitex/pkg/protocol/bthrift"
thrift "github.com/cloudwego/kitex/pkg/protocol/bthrift/apache"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pay attention to the import grouping

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

*/

package unknownservice

Copy link
Member

@YangruiEmma YangruiEmma Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to unknownservice_codec.go

server/server.go Outdated
@@ -27,6 +27,7 @@ import (
"sync"
"time"

unknownservice "github.com/cloudwego/kitex/pkg/unknownservice/service"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format import

)

// UnknownCodec implements PayloadCodec
type unknownCodec struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unknownServiceCodec

UnknownServiceHandler(ctx context.Context, serviceName, method string, request []byte) ([]byte, error)
}

// NewServiceInfo create serviceInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"NewServiceInfo create serviceInfo" ?

"errors"
"fmt"

"github.com/cloudwego/kitex/pkg/protocol/bthrift"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bthrift is Deprecated

@YangruiEmma
Copy link
Member

This feature involves a lot of details, and the PR does not meet expectations very well, so it is closed. However, thank you very much for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Feature Proposal: UnknownMethodHandler for KitexThrift/KitexProtobuf requests
3 participants