Skip to content

Commit

Permalink
gee-rpc/day7 add registry
Browse files Browse the repository at this point in the history
  • Loading branch information
geektutu committed Oct 5, 2020
1 parent 4c368a9 commit b0c73e9
Show file tree
Hide file tree
Showing 32 changed files with 1,546 additions and 13 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

推荐先阅读 **[Go 语言简明教程](https://geektutu.com/post/quick-golang.html)**,一篇文章了解Go的基本语法、并发编程,依赖管理等内容。

另外推荐 **[Go 语言笔试面试题](https://geektutu.com/post/qa-golang.html)**,加深对 Go 语言的理解。

期待关注我的「[知乎专栏](https://zhuanlan.zhihu.com/geekgo)」和「[微博](http://weibo.com/geektutu)」,查看最近的文章和动态。

### 7天用Go从零实现Web框架 - Gee
Expand Down Expand Up @@ -50,6 +52,20 @@ gorm 准备推出完全重写的 v2 版本(目前还在开发中),相对 gorm-
- 第六天:[支持事务(Transaction)](https://geektutu.com/post/geeorm-day6.html) | [Code](gee-orm/day6-transaction)
- 第七天:[数据库迁移(Migrate)](https://geektutu.com/post/geeorm-day7.html) | [Code](gee-orm/day7-migrate)


### 7天用Go从零实现RPC框架 GeeRPC

[GeeRPC](https://geektutu.com/post/geerpc.html) 是一个基于 [net/rpc](https://github.com/golang/go/tree/master/src/net/rpc) 开发的 RPC 框架
GeeRPC 是基于 Go 语言标准库 `net/rpc` 实现的,添加了协议交换、服务注册与发现、负载均衡等功能,代码约 1k。

- 第一天 - [服务端与消息编码](https://geektutu.com/post/geerpc-day1.html) | [Code](gee-rpc/day1-codec)
- 第二天 - [支持并发与异步的客户端](https://geektutu.com/post/geerpc-day2.html) | [Code](gee-rpc/day2-client)
- 第三天 - [服务注册(service register)](https://geektutu.com/post/geerpc-day3.html) | [Code](gee-rpc/day3-service )
- 第四天 - [超时处理(timeout)](https://geektutu.com/post/geerpc-day4.html) | [Code](gee-rpc/day4-timeout )
- 第五天 - [支持HTTP协议](https://geektutu.com/post/geerpc-day5.html) | [Code](gee-rpc/day5-http-debug)
- 第六天 - [负载均衡(load balance)](https://geektutu.com/post/geerpc-day6.html) | [Code](gee-rpc/day6-load-balance)
- 第七天 - [服务发现与注册中心(registry)](https://geektutu.com/post/geerpc-day7.html) | [Code](gee-rpc/day7-registry)

### WebAssembly 使用示例

具体的实践过程记录在 [Go WebAssembly 简明教程](https://geektutu.com/post/quick-go-wasm.html)
Expand Down Expand Up @@ -102,6 +118,18 @@ Xorm's desgin is easier to understand than gorm-v1, so the main designs referenc
- Day 6 - Support Transaction | [Code](gee-orm/day6-transaction)
- Day 7 - Migrate Database | [Code](gee-orm/day7-migrate)

[GeeRPC](https://geektutu.com/post/geerpc.html) is a [net/rpc](https://github.com/golang/go/tree/master/src/net/rpc)-like RPC framework

Based on golang standard library `net/rpc`, GeeRPC implements more features. eg, protocol exchange, service registration and discovery, load balance, etc.

- Day 1 - Server Message Codec | [Code](gee-rpc/day1-codec)
- Day 2 - Concurrent Client | [Code](gee-rpc/day2-client)
- Day 3 - Service Register | [Code](gee-rpc/day3-service )
- Day 4 - Timeout Processing | [Code](gee-rpc/day4-timeout )
- Day 5 - Support HTTP Protocol | [Code](gee-rpc/day5-http-debug)
- Day 6 - Load Balance | [Code](gee-rpc/day6-load-balance)
- Day 7 - Discovery and Registry | [Code](gee-rpc/day7-registry)

## Golang WebAssembly Demo

- Demo 1 - Hello World [Code](demo-wasm/hello-world)
Expand Down
2 changes: 2 additions & 0 deletions gee-rpc/day1-codec/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"geerpc/codec"
"log"
"net"
"time"
)

func startServer(addr chan string) {
Expand All @@ -28,6 +29,7 @@ func main() {
conn, _ := net.Dial("tcp", <-addr)
defer func() { _ = conn.Close() }()

time.Sleep(time.Second)
// send options
_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
cc := codec.NewGobCodec(conn)
Expand Down
2 changes: 2 additions & 0 deletions gee-rpc/day2-client/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"sync"
"time"
)

func startServer(addr chan string) {
Expand All @@ -25,6 +26,7 @@ func main() {
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
Expand Down
2 changes: 2 additions & 0 deletions gee-rpc/day3-service/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"net"
"sync"
"time"
)

type Foo int
Expand Down Expand Up @@ -37,6 +38,7 @@ func main() {
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
Expand Down
2 changes: 2 additions & 0 deletions gee-rpc/day4-timeout/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"sync"
"time"
)

type Foo int
Expand Down Expand Up @@ -38,6 +39,7 @@ func main() {
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
Expand Down
2 changes: 2 additions & 0 deletions gee-rpc/day5-http-debug/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"net/http"
"sync"
"time"
)

type Foo int
Expand All @@ -31,6 +32,7 @@ func call(addrCh chan string) {
client, _ := geerpc.DialHTTP("tcp", <-addrCh)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func main() {

addr1 := <-ch1
addr2 := <-ch2

time.Sleep(time.Second)
call(addr1, addr2)
broadcast(addr1, addr2)
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package xclient

import (
"errors"
"math/rand"
"sync"
"time"
Expand All @@ -14,8 +15,10 @@ const (
)

type Discovery interface {
Get(mode SelectMode) string
All() []string
Refresh() error // refresh from remote registry
Update(servers []string) error
Get(mode SelectMode) (string, error)
GetAll() ([]string, error)
}

var _ Discovery = (*MultiServersDiscovery)(nil)
Expand All @@ -29,38 +32,46 @@ type MultiServersDiscovery struct {
index int // record the selected position for robin algorithm
}

// Refresh doesn't make sense for MultiServersDiscovery, so ignore it
func (d *MultiServersDiscovery) Refresh() error {
return nil
}

// Update the servers of discovery dynamically if needed
func (d *MultiServersDiscovery) Update(servers []string) {
func (d *MultiServersDiscovery) Update(servers []string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.servers = servers
return nil
}

func (d *MultiServersDiscovery) Get(mode SelectMode) string {
// Get a server according to mode
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) {
d.mu.Lock()
defer d.mu.Unlock()
if len(d.servers) == 0 {
return ""
return "", errors.New("rpc discovery: no available servers")
}
switch mode {
case RandomSelect:
return d.servers[d.r.Intn(len(d.servers))]
return d.servers[d.r.Intn(len(d.servers))], nil
case RoundRobinSelect:
s := d.servers[d.index]
d.index = (d.index + 1) % len(d.servers)
return s
return s, nil
default:
return ""
return "", errors.New("rpc discovery: not supported select mode")
}
}

func (d *MultiServersDiscovery) All() []string {
// returns all servers in discovery
func (d *MultiServersDiscovery) GetAll() ([]string, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// return a copy of d.servers
servers := make([]string, len(d.servers), len(d.servers))
copy(servers, d.servers)
return servers
return servers, nil
}

// NewMultiServerDiscovery creates a MultiServersDiscovery instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,21 @@ func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod strin
// and returns its error status.
// xc will choose a proper server.
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
rpcAddr := xc.d.Get(xc.mode)
rpcAddr, err := xc.d.Get(xc.mode)
if err != nil {
return err
}
return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}

// Broadcast invokes the named function for every server registered in discovery
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface{}) error {
servers := xc.d.All()
servers, err := xc.d.GetAll()
if err != nil {
return err
}
var wg sync.WaitGroup
var mu sync.Mutex
var mu sync.Mutex // protect e and replyDone
var e error
replyDone := reply == nil // if reply is nil, don't need to set value
ctx, cancel := context.WithCancel(ctx)
Expand Down
Loading

0 comments on commit b0c73e9

Please sign in to comment.