Skip to content

Commit

Permalink
gee-rpc day6 registry
Browse files Browse the repository at this point in the history
  • Loading branch information
geektutu committed Oct 4, 2020
1 parent 1b78c37 commit c2cdbf7
Show file tree
Hide file tree
Showing 21 changed files with 1,322 additions and 123 deletions.
31 changes: 20 additions & 11 deletions gee-rpc/day2-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ func (call *Call) done() {
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closed bool // user has called Close
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}

var _ io.Closer = (*Client)(nil)
Expand All @@ -52,17 +53,24 @@ var ErrShutdown = errors.New("connection is shut down")
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
if client.closing {
return ErrShutdown
}
client.closed = true
client.closing = true
return client.cc.Close()
}

// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
if client.closing || client.shutdown {
return 0, ErrShutdown
}
seq := client.seq
Expand All @@ -84,6 +92,7 @@ func (client *Client) terminateCalls(err error) {
defer client.sending.Unlock()
client.mu.Lock()
defer client.mu.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
Expand Down
31 changes: 20 additions & 11 deletions gee-rpc/day3-service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ func (call *Call) done() {
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closed bool // user has called Close
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}

var _ io.Closer = (*Client)(nil)
Expand All @@ -52,17 +53,24 @@ var ErrShutdown = errors.New("connection is shut down")
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
if client.closing {
return ErrShutdown
}
client.closed = true
client.closing = true
return client.cc.Close()
}

// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
if client.closing || client.shutdown {
return 0, ErrShutdown
}
seq := client.seq
Expand All @@ -84,6 +92,7 @@ func (client *Client) terminateCalls(err error) {
defer client.sending.Unlock()
client.mu.Lock()
defer client.mu.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
Expand Down
73 changes: 41 additions & 32 deletions gee-rpc/day4-timeout/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ func (call *Call) done() {
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closed bool // user has called Close
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}

var _ io.Closer = (*Client)(nil)
Expand All @@ -54,17 +55,24 @@ var ErrShutdown = errors.New("connection is shut down")
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
if client.closing {
return ErrShutdown
}
client.closed = true
client.closing = true
return client.cc.Close()
}

// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
if client.closing || client.shutdown {
return 0, ErrShutdown
}
seq := client.seq
Expand All @@ -86,6 +94,7 @@ func (client *Client) terminateCalls(err error) {
defer client.sending.Unlock()
client.mu.Lock()
defer client.mu.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
Expand Down Expand Up @@ -226,44 +235,44 @@ func newClientCodec(cc codec.Codec, opt *Option) *Client {
return client
}

func dial(network, address string, opt *Option) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn, opt)
}

type clientResult struct {
client *Client
err error
}

func dialTimeout(f func() (client *Client, err error), timeout time.Duration) (*Client, error) {
if timeout == 0 {
return f()
type dialFunc func(network, address string, opt *Option) (client *Client, err error)

func dialTimeout(f dialFunc, network, address string, opts ...*Option) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
ch := make(chan clientResult)
go func() {
client, err := f()
client, err := f(network, address, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
result := <-ch
return result.client, result.err
}
select {
case <-time.After(timeout):
return nil, fmt.Errorf("rpc client: dial timeout: expect within %s", timeout)
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: dial timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
opt, err := parseOptions(opts...)
func dial(network, address string, opt *Option) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
f := func() (client *Client, err error) {
return dial(network, address, opt)
}
return dialTimeout(f, opt.ConnectTimeout)
return NewClient(conn, opt)
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(dial, network, address, opts...)
}
6 changes: 3 additions & 3 deletions gee-rpc/day4-timeout/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ func startServer(addr chan string) {

func TestClient_dialTimeout(t *testing.T) {
t.Parallel()
f := func() (client *Client, err error) {
f := func(network, address string, opt *Option) (client *Client, err error) {
time.Sleep(time.Second * 2)
return nil, nil
}
t.Run("timeout", func(t *testing.T) {
_, err := dialTimeout(f, time.Second)
_, err := dialTimeout(f, "", "", &Option{ConnectTimeout: time.Second})
_assert(err != nil && strings.Contains(err.Error(), "dial timeout"), "expect a timeout error")
})
t.Run("0", func(t *testing.T) {
_, err := dialTimeout(f, 0)
_, err := dialTimeout(f, "", "", &Option{ConnectTimeout: 0})
_assert(err == nil, "0 means no limit")
})
}
Expand Down
Loading

0 comments on commit c2cdbf7

Please sign in to comment.