Skip to content

Commit

Permalink
gee-rpc: implement day5-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
geektutu committed Oct 3, 2020
1 parent dc840eb commit 1b78c37
Show file tree
Hide file tree
Showing 25 changed files with 1,117 additions and 106 deletions.
6 changes: 1 addition & 5 deletions gee-rpc/day1-codec/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ func main() {
defer func() { _ = conn.Close() }()

// send options
_ = json.NewEncoder(conn).Encode(&geerpc.Options{
MagicNumber: geerpc.MagicNumber,
CodecType: codec.GobType,
})

_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
cc := codec.NewGobCodec(conn)
// send request & receive response
for i := 0; i < 5; i++ {
Expand Down
6 changes: 3 additions & 3 deletions gee-rpc/day1-codec/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (

const MagicNumber = 0x3bef5c

type Options struct {
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
}

var defaultOptions = &Options{
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}
Expand All @@ -42,7 +42,7 @@ var DefaultServer = NewServer()
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() { _ = conn.Close() }()
var opt Options
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
Expand Down
39 changes: 23 additions & 16 deletions gee-rpc/day2-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

// Call represents an active RPC.
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args interface{} // arguments to the function
Reply interface{} // reply from the function
Expand All @@ -34,7 +35,7 @@ func (call *Call) done() {
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec
opt *Options
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
Expand Down Expand Up @@ -96,6 +97,7 @@ func (client *Client) send(call *Call) {

// register this call.
seq, err := client.registerCall(call)
call.Seq = seq
if err != nil {
call.Error = err
call.done()
Expand Down Expand Up @@ -173,44 +175,41 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
return call.Error
}

func parseOptions(opts ...*Options) (*Options, error) {
func parseOptions(opts ...*Option) (*Option, error) {
// if opts is nil or pass nil as parameter
if len(opts) == 0 || opts[0] == nil {
return defaultOptions, nil
return DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = defaultOptions.MagicNumber
opt.MagicNumber = DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = defaultOptions.CodecType
opt.CodecType = DefaultOption.CodecType
}
return opt, nil
}

func NewClient(conn io.ReadWriteCloser, opts ...*Options) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
err = fmt.Errorf("invalid codec type %s", opt.CodecType)
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
// send options with server
if err = json.NewEncoder(conn).Encode(opt); err != nil {
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}
return newClientCodec(f(conn), opt), nil
}

func newClientCodec(cc codec.Codec, opt *Options) *Client {
func newClientCodec(cc codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1, // seq starts with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
Expand All @@ -219,11 +218,19 @@ func newClientCodec(cc codec.Codec, opt *Options) *Client {
return client
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Options) (*Client, error) {
func dial(network, address string, opt *Option) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn, opts...)
return NewClient(conn, opt)
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
return dial(network, address, opt)
}
6 changes: 3 additions & 3 deletions gee-rpc/day2-client/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (

const MagicNumber = 0x3bef5c

type Options struct {
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
}

var defaultOptions = &Options{
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}
Expand All @@ -42,7 +42,7 @@ var DefaultServer = NewServer()
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() { _ = conn.Close() }()
var opt Options
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
Expand Down
39 changes: 23 additions & 16 deletions gee-rpc/day3-service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

// Call represents an active RPC.
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args interface{} // arguments to the function
Reply interface{} // reply from the function
Expand All @@ -34,7 +35,7 @@ func (call *Call) done() {
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec
opt *Options
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
Expand Down Expand Up @@ -96,6 +97,7 @@ func (client *Client) send(call *Call) {

// register this call.
seq, err := client.registerCall(call)
call.Seq = seq
if err != nil {
call.Error = err
call.done()
Expand Down Expand Up @@ -173,44 +175,41 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
return call.Error
}

func parseOptions(opts ...*Options) (*Options, error) {
func parseOptions(opts ...*Option) (*Option, error) {
// if opts is nil or pass nil as parameter
if len(opts) == 0 || opts[0] == nil {
return defaultOptions, nil
return DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = defaultOptions.MagicNumber
opt.MagicNumber = DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = defaultOptions.CodecType
opt.CodecType = DefaultOption.CodecType
}
return opt, nil
}

func NewClient(conn io.ReadWriteCloser, opts ...*Options) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
err = fmt.Errorf("invalid codec type %s", opt.CodecType)
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
// send options with server
if err = json.NewEncoder(conn).Encode(opt); err != nil {
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}
return newClientCodec(f(conn), opt), nil
}

func newClientCodec(cc codec.Codec, opt *Options) *Client {
func newClientCodec(cc codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1, // seq starts with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
Expand All @@ -219,11 +218,19 @@ func newClientCodec(cc codec.Codec, opt *Options) *Client {
return client
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Options) (*Client, error) {
func dial(network, address string, opt *Option) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn, opts...)
return NewClient(conn, opt)
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
return dial(network, address, opt)
}
8 changes: 5 additions & 3 deletions gee-rpc/day3-service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (

const MagicNumber = 0x3bef5c

type Options struct {
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
}

var defaultOptions = &Options{
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}
Expand All @@ -45,7 +45,7 @@ var DefaultServer = NewServer()
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() { _ = conn.Close() }()
var opt Options
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
Expand Down Expand Up @@ -162,6 +162,8 @@ func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.
err := req.svc.call(req.mtype, req.argv, req.replyv)
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
Expand Down
Loading

0 comments on commit 1b78c37

Please sign in to comment.