From 3c89baa3a4d1a80268c0440c8fe686745c6d2fa9 Mon Sep 17 00:00:00 2001 From: gzdaijie Date: Wed, 7 Oct 2020 17:35:12 +0800 Subject: [PATCH] gee-rpc client move call.Seq = seq to registerCall --- gee-rpc/day1-codec/main/main.go | 1 + gee-rpc/day2-client/client.go | 7 +- gee-rpc/day2-client/main/main.go | 1 + gee-rpc/day3-service/client.go | 7 +- gee-rpc/day3-service/main/main.go | 1 + gee-rpc/day4-timeout/client.go | 7 +- gee-rpc/day4-timeout/main/main.go | 1 + gee-rpc/day5-http-debug/client.go | 7 +- gee-rpc/day5-http-debug/main/main.go | 1 + gee-rpc/day6-load-balance/client.go | 7 +- gee-rpc/day6-load-balance/main/main.go | 1 + gee-rpc/day7-registry/client.go | 7 +- gee-rpc/day7-registry/main/main.go | 1 + gee-rpc/doc/geerpc-day1.md | 440 +++++++++++++++++++++++++ gee-rpc/doc/geerpc-day2.md | 397 ++++++++++++++++++++++ gee-rpc/doc/geerpc.md | 2 +- 16 files changed, 863 insertions(+), 25 deletions(-) create mode 100644 gee-rpc/doc/geerpc-day1.md create mode 100644 gee-rpc/doc/geerpc-day2.md diff --git a/gee-rpc/day1-codec/main/main.go b/gee-rpc/day1-codec/main/main.go index 948b763..2bc6a8a 100644 --- a/gee-rpc/day1-codec/main/main.go +++ b/gee-rpc/day1-codec/main/main.go @@ -22,6 +22,7 @@ func startServer(addr chan string) { } func main() { + log.SetFlags(0) addr := make(chan string) go startServer(addr) diff --git a/gee-rpc/day2-client/client.go b/gee-rpc/day2-client/client.go index 3df6ec9..1c8a3cb 100644 --- a/gee-rpc/day2-client/client.go +++ b/gee-rpc/day2-client/client.go @@ -73,10 +73,10 @@ func (client *Client) registerCall(call *Call) (uint64, error) { if client.closing || client.shutdown { return 0, ErrShutdown } - seq := client.seq - client.pending[seq] = call + call.Seq = client.seq + client.pending[call.Seq] = call client.seq++ - return seq, nil + return call.Seq, nil } func (client *Client) removeCall(seq uint64) *Call { @@ -106,7 +106,6 @@ func (client *Client) send(call *Call) { // register this call. seq, err := client.registerCall(call) - call.Seq = seq if err != nil { call.Error = err call.done() diff --git a/gee-rpc/day2-client/main/main.go b/gee-rpc/day2-client/main/main.go index 8502fe9..099eb50 100644 --- a/gee-rpc/day2-client/main/main.go +++ b/gee-rpc/day2-client/main/main.go @@ -21,6 +21,7 @@ func startServer(addr chan string) { } func main() { + log.SetFlags(0) addr := make(chan string) go startServer(addr) client, _ := geerpc.Dial("tcp", <-addr) diff --git a/gee-rpc/day3-service/client.go b/gee-rpc/day3-service/client.go index 3df6ec9..1c8a3cb 100644 --- a/gee-rpc/day3-service/client.go +++ b/gee-rpc/day3-service/client.go @@ -73,10 +73,10 @@ func (client *Client) registerCall(call *Call) (uint64, error) { if client.closing || client.shutdown { return 0, ErrShutdown } - seq := client.seq - client.pending[seq] = call + call.Seq = client.seq + client.pending[call.Seq] = call client.seq++ - return seq, nil + return call.Seq, nil } func (client *Client) removeCall(seq uint64) *Call { @@ -106,7 +106,6 @@ func (client *Client) send(call *Call) { // register this call. seq, err := client.registerCall(call) - call.Seq = seq if err != nil { call.Error = err call.done() diff --git a/gee-rpc/day3-service/main/main.go b/gee-rpc/day3-service/main/main.go index 0f0b668..89add53 100644 --- a/gee-rpc/day3-service/main/main.go +++ b/gee-rpc/day3-service/main/main.go @@ -33,6 +33,7 @@ func startServer(addr chan string) { } func main() { + log.SetFlags(0) addr := make(chan string) go startServer(addr) client, _ := geerpc.Dial("tcp", <-addr) diff --git a/gee-rpc/day4-timeout/client.go b/gee-rpc/day4-timeout/client.go index b301647..45fc9e6 100644 --- a/gee-rpc/day4-timeout/client.go +++ b/gee-rpc/day4-timeout/client.go @@ -75,10 +75,10 @@ func (client *Client) registerCall(call *Call) (uint64, error) { if client.closing || client.shutdown { return 0, ErrShutdown } - seq := client.seq - client.pending[seq] = call + call.Seq = client.seq + client.pending[call.Seq] = call client.seq++ - return seq, nil + return call.Seq, nil } func (client *Client) removeCall(seq uint64) *Call { @@ -108,7 +108,6 @@ func (client *Client) send(call *Call) { // register this call. seq, err := client.registerCall(call) - call.Seq = seq if err != nil { call.Error = err call.done() diff --git a/gee-rpc/day4-timeout/main/main.go b/gee-rpc/day4-timeout/main/main.go index 9693eb5..efcf75c 100644 --- a/gee-rpc/day4-timeout/main/main.go +++ b/gee-rpc/day4-timeout/main/main.go @@ -34,6 +34,7 @@ func startServer(addr chan string) { } func main() { + log.SetFlags(0) addr := make(chan string) go startServer(addr) client, _ := geerpc.Dial("tcp", <-addr) diff --git a/gee-rpc/day5-http-debug/client.go b/gee-rpc/day5-http-debug/client.go index 795a18b..bdc2c0f 100644 --- a/gee-rpc/day5-http-debug/client.go +++ b/gee-rpc/day5-http-debug/client.go @@ -78,10 +78,10 @@ func (client *Client) registerCall(call *Call) (uint64, error) { if client.closing || client.shutdown { return 0, ErrShutdown } - seq := client.seq - client.pending[seq] = call + call.Seq = client.seq + client.pending[call.Seq] = call client.seq++ - return seq, nil + return call.Seq, nil } func (client *Client) removeCall(seq uint64) *Call { @@ -111,7 +111,6 @@ func (client *Client) send(call *Call) { // register this call. seq, err := client.registerCall(call) - call.Seq = seq if err != nil { call.Error = err call.done() diff --git a/gee-rpc/day5-http-debug/main/main.go b/gee-rpc/day5-http-debug/main/main.go index 6499b53..cfd1b88 100644 --- a/gee-rpc/day5-http-debug/main/main.go +++ b/gee-rpc/day5-http-debug/main/main.go @@ -51,6 +51,7 @@ func call(addrCh chan string) { } func main() { + log.SetFlags(0) ch := make(chan string) go call(ch) startServer(ch) diff --git a/gee-rpc/day6-load-balance/client.go b/gee-rpc/day6-load-balance/client.go index 795a18b..bdc2c0f 100644 --- a/gee-rpc/day6-load-balance/client.go +++ b/gee-rpc/day6-load-balance/client.go @@ -78,10 +78,10 @@ func (client *Client) registerCall(call *Call) (uint64, error) { if client.closing || client.shutdown { return 0, ErrShutdown } - seq := client.seq - client.pending[seq] = call + call.Seq = client.seq + client.pending[call.Seq] = call client.seq++ - return seq, nil + return call.Seq, nil } func (client *Client) removeCall(seq uint64) *Call { @@ -111,7 +111,6 @@ func (client *Client) send(call *Call) { // register this call. seq, err := client.registerCall(call) - call.Seq = seq if err != nil { call.Error = err call.done() diff --git a/gee-rpc/day6-load-balance/main/main.go b/gee-rpc/day6-load-balance/main/main.go index d00f864..550e848 100644 --- a/gee-rpc/day6-load-balance/main/main.go +++ b/gee-rpc/day6-load-balance/main/main.go @@ -84,6 +84,7 @@ func broadcast(addr1, addr2 string) { } func main() { + log.SetFlags(0) ch1 := make(chan string) ch2 := make(chan string) // start two servers diff --git a/gee-rpc/day7-registry/client.go b/gee-rpc/day7-registry/client.go index 795a18b..bdc2c0f 100644 --- a/gee-rpc/day7-registry/client.go +++ b/gee-rpc/day7-registry/client.go @@ -78,10 +78,10 @@ func (client *Client) registerCall(call *Call) (uint64, error) { if client.closing || client.shutdown { return 0, ErrShutdown } - seq := client.seq - client.pending[seq] = call + call.Seq = client.seq + client.pending[call.Seq] = call client.seq++ - return seq, nil + return call.Seq, nil } func (client *Client) removeCall(seq uint64) *Call { @@ -111,7 +111,6 @@ func (client *Client) send(call *Call) { // register this call. seq, err := client.registerCall(call) - call.Seq = seq if err != nil { call.Error = err call.done() diff --git a/gee-rpc/day7-registry/main/main.go b/gee-rpc/day7-registry/main/main.go index 0776312..50b57dd 100644 --- a/gee-rpc/day7-registry/main/main.go +++ b/gee-rpc/day7-registry/main/main.go @@ -94,6 +94,7 @@ func broadcast(registry string) { } func main() { + log.SetFlags(0) registryAddr := "http://localhost:9999/_geerpc_/registry" var wg sync.WaitGroup wg.Add(1) diff --git a/gee-rpc/doc/geerpc-day1.md b/gee-rpc/doc/geerpc-day1.md new file mode 100644 index 0000000..abfeb3c --- /dev/null +++ b/gee-rpc/doc/geerpc-day1.md @@ -0,0 +1,440 @@ +--- +title: 动手写RPC框架 - GeeRPC第一天 服务端与消息编码 +date: 2020-10-06 17:00:00 +description: 7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第一天实现了一个简单的服务端和消息的编码与解码。 +tags: +- Go +nav: 从零实现 +categories: +- RPC框架 - GeeRPC +keywords: +- Go语言 +- 从零实现RPC框架 +- Codec +- 序列化 +- 反序列化 +image: post/geerpc/geerpc.jpg +github: https://github.com/geektutu/7days-golang +--- + +![golang RPC framework](geerpc/geerpc.jpg) + +本文是[7天用Go从零实现RPC框架GeeRPC]的第一篇。 + +- 使用 `encoding/gob` 实现消息的编解码(序列化与反序列化) +- 实现一个简易的服务端,仅接受消息,不处理,代码约 200 行 + + +## 消息的序列化与反序列化 + +一个典型的 RPC 调用如下: + +```go +err = client.Call("Arith.Multiply", args, &reply) +``` + +客户端发送的请求包括服务名 `Arith`,方法名 `Multiply`,参数 `args` 三个,服务端的响应包括错误 `error`,返回值 `reply` 2 个。我们将请求和响应中的参数和返回值抽象为 body,剩余的信息放在 header 中,那么就可以抽象出数据结构 Header: + +[day1-codec/codec/codec.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day1-codec) + +```go +package codec + +import "io" + +type Header struct { + ServiceMethod string // format "Service.Method" + Seq uint64 // sequence number chosen by client + Error string +} +``` + +- ServiceMethod 是服务名和方法名,通常与 Go 语言中的结构体和方法相映射。 +- Seq 是请求的序号,也可以认为是某个请求的 ID,用来区分不同的请求。 +- Error 是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于 Error 中。 + + +我们将和消息编解码相关的代码都防到 codec 目录中。 + +进一步,抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例: + +```go +type Codec interface { + io.Closer + ReadHeader(*Header) error + ReadBody(interface{}) error + Write(*Header, interface{}) error +} +``` + +紧接着,抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的 `Type` 得到构造函数,从而创建 Codec 实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。 + +```go +type NewCodecFunc func(io.ReadWriteCloser) Codec + +type Type string + +const ( + GobType Type = "application/gob" + JsonType Type = "application/json" // not implemented +) + +var NewCodecFuncMap map[Type]NewCodecFunc + +func init() { + NewCodecFuncMap = make(map[Type]NewCodecFunc) + NewCodecFuncMap[GobType] = NewGobCodec +} +``` + +我们定义了 2 种 Codec,`Gob` 和 `Json`,但是实际代码中只实现了 `Gob` 一种,事实上,2 者的实现非常接近,甚至只需要把 `gob` 换成 `json` 即可。 + +首先定义 `GobCodec` 结构体,这个结构体由四部分构成,`conn` 是由构建函数传入,通常是通过 TCP 或者 Unix 建立 socket 时得到的链接实例,dec 和 enc 对应 gob 的 Decoder 和 Encoder,buf 是为了防止阻塞而创建的带缓冲的 `Writer`,一般这么做能提升性能。 + +[day1-codec/codec/gob.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day1-codec) + +```go +package codec + +import ( + "bufio" + "encoding/gob" + "io" + "log" +) + +type GobCodec struct { + conn io.ReadWriteCloser + buf *bufio.Writer + dec *gob.Decoder + enc *gob.Encoder +} + +var _ Codec = (*GobCodec)(nil) + +func NewGobCodec(conn io.ReadWriteCloser) Codec { + buf := bufio.NewWriter(conn) + return &GobCodec{ + conn: conn, + buf: buf, + dec: gob.NewDecoder(conn), + enc: gob.NewEncoder(buf), + } +} +``` + +接着实现 `ReadHeader`、`ReadBody`、`Write` 和 `Close` 方法。 + +```go +func (c *GobCodec) ReadHeader(h *Header) error { + return c.dec.Decode(h) +} + +func (c *GobCodec) ReadBody(body interface{}) error { + return c.dec.Decode(body) +} + +func (c *GobCodec) Write(h *Header, body interface{}) (err error) { + defer func() { + _ = c.buf.Flush() + if err != nil { + _ = c.Close() + } + }() + if err := c.enc.Encode(h); err != nil { + log.Println("rpc codec: gob error encoding header:", err) + return err + } + if err := c.enc.Encode(body); err != nil { + log.Println("rpc codec: gob error encoding body:", err) + return err + } + return nil +} + +func (c *GobCodec) Close() error { + return c.conn.Close() +} +``` + +## 通信过程 + +客户端与服务端的通信需要协商一些内容,例如 HTTP 报文,分为 header 和 body 2 部分,body 的格式和长度通过 header 中的 `Content-Type` 和 `Content-Length` 指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。对于 RPC 协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来表示序列化方式,第2个字节表示压缩方式,第3-6字节表示 header 的长度,7-10 字节表示 body 的长度。 + +对于 GeeRPC 来说,目前需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息,放到结构体 `Option` 中承载。目前,已经进入到服务端的实现阶段了。 + +[day1-codec/server.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day1-codec) + +```go +package geerpc + +const MagicNumber = 0x3bef5c + +type Option struct { + MagicNumber int // MagicNumber marks this's a geerpc request + CodecType codec.Type // client may choose different Codec to encode body +} + +var DefaultOption = &Option{ + MagicNumber: MagicNumber, + CodecType: codec.GobType, +} +``` + +一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 得 CodeType 解码剩余的内容。即报文将以这样的形式发送: + +```bash +| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} | +| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->| +``` + +在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个,即报文可能是这样的。 + +```go +| Option | Header1 | Body1 | Header2 | Body2 | ... +``` + +## 服务端的实现 + +通信过程已经定义清楚了,那么服务端的实现就比较直接了。 + +[day1-codec/server.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day1-codec) + +```go +// Server represents an RPC Server. +type Server struct{} + +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{} +} + +// DefaultServer is the default instance of *Server. +var DefaultServer = NewServer() + +// Accept accepts connections on the listener and serves requests +// for each incoming connection. +func (server *Server) Accept(lis net.Listener) { + for { + conn, err := lis.Accept() + if err != nil { + log.Println("rpc server: accept error:", err) + return + } + go server.ServeConn(conn) + } +} + +// Accept accepts connections on the listener and serves requests +// for each incoming connection. +func Accept(lis net.Listener) { DefaultServer.Accept(lis) } +``` + +- 首先定义了结构体 `Server`,没有任何的成员字段。 +- 实现了 `Accept` 方式,`net.Listener` 作为参数,for 循环等待 socket 连接建立,并开启子协程处理,处理过程交给了 `ServerConn` 方法。 +- DefaultServer 是一个默认的 `Server` 实例,主要为了用户使用方便。 + +如果想启动服务,过程是非常简单的,传入 listener 即可,tcp 协议和 unix 协议都支持。 + +```go +lis, _ := net.Listen("tcp", ":9999") +geerpc.Accept(lis) +``` + +`ServeConn` 的实现就和之前讨论的通信过程紧密相关了,首先使用 `json.NewDecoder` 反序列化得到 Option 实例,检查 MagicNumber 和 CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器,接下来的处理交给 `serverCodec`。 + +```go +// ServeConn runs the server on a single connection. +// ServeConn blocks, serving the connection until the client hangs up. +func (server *Server) ServeConn(conn io.ReadWriteCloser) { + defer func() { _ = conn.Close() }() + var opt Option + if err := json.NewDecoder(conn).Decode(&opt); err != nil { + log.Println("rpc server: options error: ", err) + return + } + if opt.MagicNumber != MagicNumber { + log.Printf("rpc server: invalid magic number %x", opt.MagicNumber) + return + } + f := codec.NewCodecFuncMap[opt.CodecType] + if f == nil { + log.Printf("rpc server: invalid codec type %s", opt.CodecType) + return + } + server.serveCodec(f(conn)) +} + +// invalidRequest is a placeholder for response argv when error occurs +var invalidRequest = struct{}{} + +func (server *Server) serveCodec(cc codec.Codec) { + sending := new(sync.Mutex) // make sure to send a complete response + wg := new(sync.WaitGroup) // wait until all request are handled + for { + req, err := server.readRequest(cc) + if err != nil { + if req == nil { + break // it's not possible to recover, so close the connection + } + req.h.Error = err.Error() + server.sendResponse(cc, req.h, invalidRequest, sending) + continue + } + wg.Add(1) + go server.handleRequest(cc, req, sending, wg) + } + wg.Wait() + _ = cc.Close() +} +``` + +`serveCodec` 的过程非常简单。主要包含三个阶段 + +- 读取请求 readRequest +- 处理请求 handleRequest +- 回复请求 sendResponse + +之前提到过,在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个: + +- handleRequest 使用了协程并发执行请求。 +- 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。 +- 尽力而为,只有在 header 解析失败时,才终止循环。 + +```go +// request stores all information of a call +type request struct { + h *codec.Header // header of request + argv, replyv reflect.Value // argv and replyv of request +} + +func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) { + var h codec.Header + if err := cc.ReadHeader(&h); err != nil { + if err != io.EOF && err != io.ErrUnexpectedEOF { + log.Println("rpc server: read header error:", err) + } + return nil, err + } + return &h, nil +} + +func (server *Server) readRequest(cc codec.Codec) (*request, error) { + h, err := server.readRequestHeader(cc) + if err != nil { + return nil, err + } + req := &request{h: h} + // TODO: now we don't know the type of request argv + // day 1, just suppose it's string + req.argv = reflect.New(reflect.TypeOf("")) + if err = cc.ReadBody(req.argv.Interface()); err != nil { + log.Println("rpc server: read argv err:", err) + } + return req, nil +} + +func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) { + sending.Lock() + defer sending.Unlock() + if err := cc.Write(h, body); err != nil { + log.Println("rpc server: write response error:", err) + } +} + +func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) { + // TODO, should call registered rpc methods to get the right replyv + // day 1, just print argv and send a hello message + defer wg.Done() + log.Println(req.h, req.argv.Elem()) + req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq)) + server.sendResponse(cc, req.h, req.replyv.Interface(), sending) +} +``` + +目前还不能判断 body 的类型,因此在 readRequest 和 handleRequest 中,day1 将 body 作为字符串处理。接收到请求,打印 header,并回复 `geerpc resp ${req.h.Seq}`。这一部分后续再实现。 + + +## main 函数(一个简易的客户端) + +day1 的内容就到此为止了,在这里我们已经实现了一个消息的编解码器 `GobCodec`,并且客户端与服务端实现了简单的协议交换(protocol exchange),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并回复客户端的请求。 + +接下来,我们就在 main 函数中看看如何使用刚实现的 GeeRPC 吧。 + +[day1-codec/main/main.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day1-codec) + +```go +package main + +import ( + "encoding/json" + "fmt" + "geerpc" + "geerpc/codec" + "log" + "net" + "time" +) + +func startServer(addr chan string) { + // pick a free port + l, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatal("network error:", err) + } + log.Println("start rpc server on", l.Addr()) + addr <- l.Addr().String() + geerpc.Accept(l) +} + +func main() { + addr := make(chan string) + go startServer(addr) + + // in fact, following code is like a simple geerpc client + conn, _ := net.Dial("tcp", <-addr) + defer func() { _ = conn.Close() }() + + time.Sleep(time.Second) + // send options + _ = json.NewEncoder(conn).Encode(geerpc.DefaultOption) + cc := codec.NewGobCodec(conn) + // send request & receive response + for i := 0; i < 5; i++ { + h := &codec.Header{ + ServiceMethod: "Foo.Sum", + Seq: uint64(i), + } + _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq)) + _ = cc.ReadHeader(h) + var reply string + _ = cc.ReadBody(&reply) + log.Println("reply:", reply) + } +} +``` + +- 在 `startServer` 中使用了信道 `addr`,确保服务端端口监听成功,客户端再发起请求。 +- 客户端首先发送 `Option` 进行协议交换,接下来发送消息头 `h := &codec.Header{}`,和消息体 `geerpc req ${h.Seq}`。 +- 最后解析服务端的响应 `reply`,并打印出来。 + +执行结果如下: + +```bash +start rpc server on [::]:63662 +&{Foo.Sum 0 } geerpc req 0 +reply: geerpc resp 0 +&{Foo.Sum 1 } geerpc req 1 +reply: geerpc resp 1 +&{Foo.Sum 2 } geerpc req 2 +reply: geerpc resp 2 +&{Foo.Sum 3 } geerpc req 3 +reply: geerpc resp 3 +&{Foo.Sum 4 } geerpc req 4 +reply: geerpc resp 4 +``` + +## 附 推荐阅读 + +- [Go 语言简明教程](https://geektutu.com/post/quick-golang.html) +- [Go 语言笔试面试题](https://geektutu.com/post/qa-golang.html) \ No newline at end of file diff --git a/gee-rpc/doc/geerpc-day2.md b/gee-rpc/doc/geerpc-day2.md new file mode 100644 index 0000000..2d356e3 --- /dev/null +++ b/gee-rpc/doc/geerpc-day2.md @@ -0,0 +1,397 @@ +--- +title: 动手写RPC框架 - GeeRPC第二天 支持并发与异步的客户端 +date: 2020-10-07 18:00:00 +description: 7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第二天实现了一个支持异步(asynchronous)和并发(concurrent)的客户端。 +tags: +- Go +nav: 从零实现 +categories: +- RPC框架 - GeeRPC +keywords: +- Go语言 +- 从零实现RPC框架 +- 客户端 +- 异步 +- 并发 +image: post/geerpc/geerpc.jpg +github: https://github.com/geektutu/7days-golang +--- + +![golang RPC framework](geerpc/geerpc.jpg) + +本文是[7天用Go从零实现RPC框架GeeRPC]的第二篇。 + +- 实现一个支持异步和并发的高性能客户端,代码约 250 行 + + +## Call 的设计 + +对 `net/rpc` 而言,一个函数需要能够被远程调用,需要满足如下五个条件: + +- the method's type is exported. +- the method is exported. +- the method has two arguments, both exported (or builtin) types. +- the method's second argument is a pointer. +- the method has return type error. + +更直观一些: + +```go +func (t *T) MethodName(argType T1, replyType *T2) error +``` + +根据上述要求,首先我们封装了结构体 Call 来承载一次 RPC 调用所需要的信息。 + +[day2-client/client.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day2-client) + +```go +// Call represents an active RPC. +type Call struct { + Seq uint64 + ServiceMethod string // format "." + Args interface{} // arguments to the function + Reply interface{} // reply from the function + Error error // if error occurs, it will be set + Done chan *Call // Strobes when call is complete. +} + +func (call *Call) done() { + call.Done <- call +} +``` + +为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是 `chan *Call`,当调用结束时,会调用 `call.done()` 通知调用方。 + + +## 实现 Client + +接下来,我们将实现 GeeRPC 客户端最核心的部分 Client。 + +```go +// Client represents an RPC Client. +// There may be multiple outstanding Calls associated +// with a single Client, and a Client may be used by +// multiple goroutines simultaneously. +type Client struct { + cc codec.Codec + opt *Option + sending sync.Mutex // protect following + header codec.Header + mu sync.Mutex // protect following + seq uint64 + pending map[uint64]*Call + closing bool // user has called Close + shutdown bool // server has told us to stop +} + +var _ io.Closer = (*Client)(nil) + +var ErrShutdown = errors.New("connection is shut down") + +// Close the connection +func (client *Client) Close() error { + client.mu.Lock() + defer client.mu.Unlock() + if client.closing { + return ErrShutdown + } + client.closing = true + return client.cc.Close() +} + +// IsAvailable return true if the client does work +func (client *Client) IsAvailable() bool { + client.mu.Lock() + defer client.mu.Unlock() + return !client.shutdown && !client.closing +} +``` + +Client 的字段比较复杂: + +- cc 是消息的编解码器,和服务端类似,用来序列化将要发送出去的请求,以及反序列化接收到的响应。 +- sending 是一个互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。 +- header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。 +- seq 用于给发送的请求编号,每个请求拥有唯一编号。 +- pending 存储未处理完的请求,键是编号,值是 Call 实例。 +- closing 和 shutdown 任意一个值置为 true,则表示 Client 处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用 `Close` 方法,而 shutdown 置为 true 一般是有错误发生。 + +紧接着,实现和 Call 相关的三个方法。 + +```go +func (client *Client) registerCall(call *Call) (uint64, error) { + client.mu.Lock() + defer client.mu.Unlock() + if client.closing || client.shutdown { + return 0, ErrShutdown + } + call.Seq = client.seq + client.pending[call.Seq] = call + client.seq++ + return call.Seq, nil +} + +func (client *Client) removeCall(seq uint64) *Call { + client.mu.Lock() + defer client.mu.Unlock() + call := client.pending[seq] + delete(client.pending, seq) + return call +} + +func (client *Client) terminateCalls(err error) { + client.sending.Lock() + defer client.sending.Unlock() + client.mu.Lock() + defer client.mu.Unlock() + client.shutdown = true + for _, call := range client.pending { + call.Error = err + call.done() + } +} +``` + +- registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq。 +- removeCall:根据 seq,从 client.pending 中移除对应的 call,并返回。 +- terminateCalls:服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。 + +对一个客户端端来说,接收响应、发送请求是最重要的 2 个功能。那么首先实现接收功能,接收到的响应有三种情况: + +- call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。 +- call 存在,但服务端处理出错,即 h.Error 不为空。 +- call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。 + +```go +func (client *Client) receive() { + var err error + for err == nil { + var h codec.Header + if err = client.cc.ReadHeader(&h); err != nil { + break + } + call := client.removeCall(h.Seq) + switch { + case call == nil: + // it usually means that Write partially failed + // and call was already removed. + err = client.cc.ReadBody(nil) + case h.Error != "": + call.Error = fmt.Errorf(h.Error) + err = client.cc.ReadBody(nil) + call.done() + default: + err = client.cc.ReadBody(call.Reply) + if err != nil { + call.Error = errors.New("reading body " + err.Error()) + } + call.done() + } + } + // error occurs, so terminateCalls pending calls + client.terminateCalls(err) +} +``` + +创建 Client 实例时,首先需要完成一开始的协议交换,即发送 `Option` 信息给服务端。协商好消息的编解码方式之后,再创建一个子协程调用 `receive()` 接收响应。 + +```go +func NewClient(conn net.Conn, opt *Option) (*Client, error) { + f := codec.NewCodecFuncMap[opt.CodecType] + if f == nil { + err := fmt.Errorf("invalid codec type %s", opt.CodecType) + log.Println("rpc client: codec error:", err) + return nil, err + } + // send options with server + if err := json.NewEncoder(conn).Encode(opt); err != nil { + log.Println("rpc client: options error: ", err) + _ = conn.Close() + return nil, err + } + return newClientCodec(f(conn), opt), nil +} + +func newClientCodec(cc codec.Codec, opt *Option) *Client { + client := &Client{ + seq: 1, // seq starts with 1, 0 means invalid call + cc: cc, + opt: opt, + pending: make(map[uint64]*Call), + } + go client.receive() + return client +} +``` + +还需要实现 `Dial` 函数,便于用户传入服务端地址,创建 Client 实例。为了简化用户调用,通过 `...*Option` 将 Option 实现为可选参数。 + +```go +func parseOptions(opts ...*Option) (*Option, error) { + // if opts is nil or pass nil as parameter + if len(opts) == 0 || opts[0] == nil { + return DefaultOption, nil + } + if len(opts) != 1 { + return nil, errors.New("number of options is more than 1") + } + opt := opts[0] + opt.MagicNumber = DefaultOption.MagicNumber + if opt.CodecType == "" { + opt.CodecType = DefaultOption.CodecType + } + return opt, nil +} + +func dial(network, address string, opt *Option) (*Client, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + return NewClient(conn, opt) +} + +// Dial connects to an RPC server at the specified network address +func Dial(network, address string, opts ...*Option) (*Client, error) { + opt, err := parseOptions(opts...) + if err != nil { + return nil, err + } + return dial(network, address, opt) +} +``` + +此时,GeeRPC 客户端已经具备了完整的创建连接和接收响应的能力了,最后还需要实现发送请求的能力。 + +```go +func (client *Client) send(call *Call) { + // make sure that the client will send a complete request + client.sending.Lock() + defer client.sending.Unlock() + + // register this call. + seq, err := client.registerCall(call) + if err != nil { + call.Error = err + call.done() + return + } + + // prepare request header + client.header.ServiceMethod = call.ServiceMethod + client.header.Seq = seq + client.header.Error = "" + + // encode and send the request + if err := client.cc.Write(&client.header, call.Args); err != nil { + call := client.removeCall(seq) + // call may be nil, it usually means that Write partially failed, + // client has received the response and handled + if call != nil { + call.Error = err + call.done() + } + } +} + +// Go invokes the function asynchronously. +// It returns the Call structure representing the invocation. +func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call { + if done == nil { + done = make(chan *Call, 10) + } else if cap(done) == 0 { + log.Panic("rpc client: done channel is unbuffered") + } + call := &Call{ + ServiceMethod: serviceMethod, + Args: args, + Reply: reply, + Done: done, + } + client.send(call) + return call +} + +// Call invokes the named function, waits for it to complete, +// and returns its error status. +func (client *Client) Call(serviceMethod string, args, reply interface{}) error { + call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done + return call.Error +} +``` + +- `Go` 和 `Call` 是客户端暴露给用户的两个 RPC 服务调用接口,`Go` 是一个异步接口,返回 call 实例。 +- `Call` 是对 `Go` 的封装,阻塞 call.Done,等待响应返回,是一个同步接口。 + +至此,一个支持异步和并发的 GeeRPC 客户端已经完成。 + +## Demo + +第一天 GeeRPC 只实现了服务端,因此我们在 main 函数中手动模拟了整个通信过程,今天我们就将 main 函数中通信部分替换为今天的客户端吧。 + +[day2-client/main/main.go](https://github.com/geektutu/7days-golang/tree/master/gee-rpc/day2-client) + +startServer 没有发生变化。 + +```go +func startServer(addr chan string) { + // pick a free port + l, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatal("network error:", err) + } + log.Println("start rpc server on", l.Addr()) + addr <- l.Addr().String() + geerpc.Accept(l) +} +``` + +在 main 函数中使用了 `client.Call` 并发了 5 个 RPC 同步调用,参数和返回值的类型均为 string。 + +```go +func main() { + log.SetFlags(0) + addr := make(chan string) + go startServer(addr) + client, _ := geerpc.Dial("tcp", <-addr) + defer func() { _ = client.Close() }() + + time.Sleep(time.Second) + // send request & receive response + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + args := fmt.Sprintf("geerpc req %d", i) + var reply string + if err := client.Call("Foo.Sum", args, &reply); err != nil { + log.Fatal("call Foo.Sum error:", err) + } + log.Println("reply:", reply) + }(i) + } + wg.Wait() +} +``` + +运行结果如下: + +```bash +start rpc server on [::]:50658 +&{Foo.Sum 5 } geerpc req 3 +&{Foo.Sum 1 } geerpc req 0 +&{Foo.Sum 3 } geerpc req 1 +&{Foo.Sum 2 } geerpc req 4 +&{Foo.Sum 4 } geerpc req 2 +reply: geerpc resp 1 +reply: geerpc resp 5 +reply: geerpc resp 3 +reply: geerpc resp 2 +reply: geerpc resp 4 +``` + +## 附 推荐阅读 + +- [Go 语言简明教程](https://geektutu.com/post/quick-golang.html) +- [Go 语言笔试面试题](https://geektutu.com/post/qa-golang.html) \ No newline at end of file diff --git a/gee-rpc/doc/geerpc.md b/gee-rpc/doc/geerpc.md index 9c49e6d..7a6a324 100644 --- a/gee-rpc/doc/geerpc.md +++ b/gee-rpc/doc/geerpc.md @@ -1,7 +1,7 @@ --- title: 7天用Go从零实现RPC框架GeeRPC date: 2020-10-06 16:00:00 -description: 7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。并在此基础上新增了协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性。 +description: 7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。并在此基础上新增了协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性。 tags: - Go nav: 从零实现