From f0e86d4c31f558cc630379d42ec7655090921965 Mon Sep 17 00:00:00 2001 From: gzdaijie Date: Wed, 5 Feb 2020 16:33:26 +0800 Subject: [PATCH] day5 multi nodes finish main func & add a auto start shell script --- .../day2-single-node/geecache/geecache.go | 11 ++- .../geecache/geecache_test.go | 2 +- .../day3-http-server/geecache/geecache.go | 11 ++- .../geecache/geecache_test.go | 2 +- gee-cache/day3-http-server/geecache/http.go | 8 ++- gee-cache/day3-http-server/main.go | 10 ++- .../day4-consistent-hash/geecache/geecache.go | 11 ++- .../geecache/geecache_test.go | 2 +- .../day4-consistent-hash/geecache/http.go | 8 ++- gee-cache/day4-consistent-hash/main.go | 10 ++- .../day5-multi-nodes/geecache/geecache.go | 29 +++++--- .../geecache/geecache_test.go | 2 +- gee-cache/day5-multi-nodes/geecache/http.go | 28 +++----- gee-cache/day5-multi-nodes/geecache/peers.go | 28 -------- gee-cache/day5-multi-nodes/main.go | 67 +++++++++++++++++-- gee-cache/day5-multi-nodes/run.sh | 9 +++ 16 files changed, 169 insertions(+), 69 deletions(-) create mode 100755 gee-cache/day5-multi-nodes/run.sh diff --git a/gee-cache/day2-single-node/geecache/geecache.go b/gee-cache/day2-single-node/geecache/geecache.go index c2c8333..58b98b8 100644 --- a/gee-cache/day2-single-node/geecache/geecache.go +++ b/gee-cache/day2-single-node/geecache/geecache.go @@ -1,6 +1,10 @@ package geecache -import "sync" +import ( + "fmt" + "log" + "sync" +) // A Group is a cache namespace and associated data loaded spread over type Group struct { @@ -54,7 +58,12 @@ func GetGroup(name string) *Group { // Get value for a key from cache func (g *Group) Get(key string) (ByteView, error) { + if key == "" { + return ByteView{}, fmt.Errorf("key is required") + } + if v, ok := g.mainCache.get(key); ok { + log.Println("[GeeCache] hit") return v, nil } diff --git a/gee-cache/day2-single-node/geecache/geecache_test.go b/gee-cache/day2-single-node/geecache/geecache_test.go index 2bffb3f..9cb4079 100644 --- a/gee-cache/day2-single-node/geecache/geecache_test.go +++ b/gee-cache/day2-single-node/geecache/geecache_test.go @@ -15,7 +15,7 @@ var db = map[string]string{ func TestGet(t *testing.T) { gee := NewGroup("scores", 2<<10, GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } diff --git a/gee-cache/day3-http-server/geecache/geecache.go b/gee-cache/day3-http-server/geecache/geecache.go index c2c8333..58b98b8 100644 --- a/gee-cache/day3-http-server/geecache/geecache.go +++ b/gee-cache/day3-http-server/geecache/geecache.go @@ -1,6 +1,10 @@ package geecache -import "sync" +import ( + "fmt" + "log" + "sync" +) // A Group is a cache namespace and associated data loaded spread over type Group struct { @@ -54,7 +58,12 @@ func GetGroup(name string) *Group { // Get value for a key from cache func (g *Group) Get(key string) (ByteView, error) { + if key == "" { + return ByteView{}, fmt.Errorf("key is required") + } + if v, ok := g.mainCache.get(key); ok { + log.Println("[GeeCache] hit") return v, nil } diff --git a/gee-cache/day3-http-server/geecache/geecache_test.go b/gee-cache/day3-http-server/geecache/geecache_test.go index 2bffb3f..9cb4079 100644 --- a/gee-cache/day3-http-server/geecache/geecache_test.go +++ b/gee-cache/day3-http-server/geecache/geecache_test.go @@ -15,7 +15,7 @@ var db = map[string]string{ func TestGet(t *testing.T) { gee := NewGroup("scores", 2<<10, GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } diff --git a/gee-cache/day3-http-server/geecache/http.go b/gee-cache/day3-http-server/geecache/http.go index 4d33d5d..ef5e2f1 100644 --- a/gee-cache/day3-http-server/geecache/http.go +++ b/gee-cache/day3-http-server/geecache/http.go @@ -1,6 +1,7 @@ package geecache import ( + "fmt" "log" "net/http" "strings" @@ -23,12 +24,17 @@ func NewHTTPPool(self string) *HTTPPool { } } +// Log info with server name +func (p *HTTPPool) Log(format string, v ...interface{}) { + log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...)) +} + // ServeHTTP handle all http requests func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, p.basePath) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } - log.Println("[geecache server]", r.Method, r.URL.Path) + p.Log("%s %s", r.Method, r.URL.Path) // /// required parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2) if len(parts) != 2 { diff --git a/gee-cache/day3-http-server/main.go b/gee-cache/day3-http-server/main.go index 570a90a..5442dd7 100644 --- a/gee-cache/day3-http-server/main.go +++ b/gee-cache/day3-http-server/main.go @@ -1,5 +1,13 @@ package main +/* +$ curl http://localhost:9999/_geecache/scores/Tom +630 + +$ curl http://localhost:9999/_geecache/scores/kkk +kkk not exist +*/ + import ( "fmt" "geecache" @@ -16,7 +24,7 @@ var db = map[string]string{ func main() { geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } diff --git a/gee-cache/day4-consistent-hash/geecache/geecache.go b/gee-cache/day4-consistent-hash/geecache/geecache.go index c2c8333..58b98b8 100644 --- a/gee-cache/day4-consistent-hash/geecache/geecache.go +++ b/gee-cache/day4-consistent-hash/geecache/geecache.go @@ -1,6 +1,10 @@ package geecache -import "sync" +import ( + "fmt" + "log" + "sync" +) // A Group is a cache namespace and associated data loaded spread over type Group struct { @@ -54,7 +58,12 @@ func GetGroup(name string) *Group { // Get value for a key from cache func (g *Group) Get(key string) (ByteView, error) { + if key == "" { + return ByteView{}, fmt.Errorf("key is required") + } + if v, ok := g.mainCache.get(key); ok { + log.Println("[GeeCache] hit") return v, nil } diff --git a/gee-cache/day4-consistent-hash/geecache/geecache_test.go b/gee-cache/day4-consistent-hash/geecache/geecache_test.go index 2bffb3f..9cb4079 100644 --- a/gee-cache/day4-consistent-hash/geecache/geecache_test.go +++ b/gee-cache/day4-consistent-hash/geecache/geecache_test.go @@ -15,7 +15,7 @@ var db = map[string]string{ func TestGet(t *testing.T) { gee := NewGroup("scores", 2<<10, GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } diff --git a/gee-cache/day4-consistent-hash/geecache/http.go b/gee-cache/day4-consistent-hash/geecache/http.go index 4d33d5d..ef5e2f1 100644 --- a/gee-cache/day4-consistent-hash/geecache/http.go +++ b/gee-cache/day4-consistent-hash/geecache/http.go @@ -1,6 +1,7 @@ package geecache import ( + "fmt" "log" "net/http" "strings" @@ -23,12 +24,17 @@ func NewHTTPPool(self string) *HTTPPool { } } +// Log info with server name +func (p *HTTPPool) Log(format string, v ...interface{}) { + log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...)) +} + // ServeHTTP handle all http requests func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, p.basePath) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } - log.Println("[geecache server]", r.Method, r.URL.Path) + p.Log("%s %s", r.Method, r.URL.Path) // /// required parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2) if len(parts) != 2 { diff --git a/gee-cache/day4-consistent-hash/main.go b/gee-cache/day4-consistent-hash/main.go index 570a90a..5442dd7 100644 --- a/gee-cache/day4-consistent-hash/main.go +++ b/gee-cache/day4-consistent-hash/main.go @@ -1,5 +1,13 @@ package main +/* +$ curl http://localhost:9999/_geecache/scores/Tom +630 + +$ curl http://localhost:9999/_geecache/scores/kkk +kkk not exist +*/ + import ( "fmt" "geecache" @@ -16,7 +24,7 @@ var db = map[string]string{ func main() { geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } diff --git a/gee-cache/day5-multi-nodes/geecache/geecache.go b/gee-cache/day5-multi-nodes/geecache/geecache.go index db6e67a..dbd5c3e 100644 --- a/gee-cache/day5-multi-nodes/geecache/geecache.go +++ b/gee-cache/day5-multi-nodes/geecache/geecache.go @@ -1,6 +1,8 @@ package geecache import ( + "fmt" + "log" "sync" ) @@ -10,7 +12,6 @@ type Group struct { getter Getter mainCache cache peers PeerPicker - peersOnce sync.Once } // A Getter loads data for a key. @@ -58,16 +59,26 @@ func GetGroup(name string) *Group { // Get value for a key from cache func (g *Group) Get(key string) (ByteView, error) { - g.peersOnce.Do(func() { - g.peers = getPeers() - }) + if key == "" { + return ByteView{}, fmt.Errorf("key is required") + } + if v, ok := g.mainCache.get(key); ok { + log.Println("[GeeCache] hit") return v, nil } return g.load(key) } +// RegisterPeers registers a PeerPicker for choosing remote peer +func (g *Group) RegisterPeers(peers PeerPicker) { + if g.peers != nil { + panic("RegisterPeerPicker called more than once") + } + g.peers = peers +} + func cloneBytes(b []byte) []byte { c := make([]byte, len(b)) copy(c, b) @@ -75,10 +86,12 @@ func cloneBytes(b []byte) []byte { } func (g *Group) load(key string) (value ByteView, err error) { - if peer, ok := g.peers.PickPeer(key); ok { - value, err = g.getFromPeer(peer, key) - if err == nil { - return value, nil + if g.peers != nil { + if peer, ok := g.peers.PickPeer(key); ok { + if value, err = g.getFromPeer(peer, key); err == nil { + return value, nil + } + log.Println("[GeeCache] Failed to get from peer", err) } } diff --git a/gee-cache/day5-multi-nodes/geecache/geecache_test.go b/gee-cache/day5-multi-nodes/geecache/geecache_test.go index 2bffb3f..9cb4079 100644 --- a/gee-cache/day5-multi-nodes/geecache/geecache_test.go +++ b/gee-cache/day5-multi-nodes/geecache/geecache_test.go @@ -15,7 +15,7 @@ var db = map[string]string{ func TestGet(t *testing.T) { gee := NewGroup("scores", 2<<10, GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } diff --git a/gee-cache/day5-multi-nodes/geecache/http.go b/gee-cache/day5-multi-nodes/geecache/http.go index c108edf..d7da2c3 100644 --- a/gee-cache/day5-multi-nodes/geecache/http.go +++ b/gee-cache/day5-multi-nodes/geecache/http.go @@ -1,10 +1,9 @@ package geecache import ( - "bytes" "fmt" "geecache/consistenthash" - "io" + "io/ioutil" "log" "net/http" "net/url" @@ -29,12 +28,15 @@ type HTTPPool struct { // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker. func NewHTTPPool(self string) *HTTPPool { - p := &HTTPPool{ + return &HTTPPool{ self: self, basePath: defaultBasePath, } - RegisterPeerPicker(func() PeerPicker { return p }) - return p +} + +// Log info with server name +func (p *HTTPPool) Log(format string, v ...interface{}) { + log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...)) } // ServeHTTP handle all http requests @@ -42,7 +44,7 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, p.basePath) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } - log.Println("[geecache server]", r.Method, r.URL.Path) + p.Log("%s %s", r.Method, r.URL.Path) // /// required parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2) if len(parts) != 2 { @@ -86,6 +88,7 @@ func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) { p.mu.Lock() defer p.mu.Unlock() if peer := p.peers.Get(key); peer != "" && peer != p.self { + p.Log("Pick peer %s", peer) return p.httpGetters[peer], true } return nil, false @@ -97,10 +100,6 @@ type httpGetter struct { baseURL string } -var bufferPool = sync.Pool{ - New: func() interface{} { return new(bytes.Buffer) }, -} - func (h *httpGetter) Get(group string, key string) ([]byte, error) { u := fmt.Sprintf( "%v%v/%v", @@ -118,17 +117,12 @@ func (h *httpGetter) Get(group string, key string) ([]byte, error) { return nil, fmt.Errorf("server returned: %v", res.Status) } - b := bufferPool.Get().(*bytes.Buffer) - b.Reset() - defer bufferPool.Put(b) - - _, err = io.Copy(b, res.Body) - + bytes, err := ioutil.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("reading response body: %v", err) } - return b.Bytes(), nil + return bytes, nil } var _ PeerGetter = (*httpGetter)(nil) diff --git a/gee-cache/day5-multi-nodes/geecache/peers.go b/gee-cache/day5-multi-nodes/geecache/peers.go index cfc39f7..8d010e2 100644 --- a/gee-cache/day5-multi-nodes/geecache/peers.go +++ b/gee-cache/day5-multi-nodes/geecache/peers.go @@ -10,31 +10,3 @@ type PeerPicker interface { type PeerGetter interface { Get(group string, key string) ([]byte, error) } - -// NoPeers is an implementation of PeerPicker that never finds a peer. -type NoPeers struct{} - -// PickPeer return nothing -func (NoPeers) PickPeer(key string) (peer PeerGetter, ok bool) { return } - -var portPicker func() PeerPicker - -// RegisterPeerPicker registers the peer initialization function. -// It is called once, when the first group is created. -func RegisterPeerPicker(fn func() PeerPicker) { - if portPicker != nil { - panic("RegisterPeerPicker called more than once") - } - portPicker = fn -} - -func getPeers() PeerPicker { - if portPicker == nil { - return NoPeers{} - } - pk := portPicker() - if pk == nil { - pk = NoPeers{} - } - return pk -} diff --git a/gee-cache/day5-multi-nodes/main.go b/gee-cache/day5-multi-nodes/main.go index 570a90a..a99940c 100644 --- a/gee-cache/day5-multi-nodes/main.go +++ b/gee-cache/day5-multi-nodes/main.go @@ -1,6 +1,15 @@ package main +/* +$ curl "http://localhost:9999/api?key=Tom" +630 + +$ curl "http://localhost:9999/api?key=kkk" +kkk not exist +*/ + import ( + "flag" "fmt" "geecache" "log" @@ -13,18 +22,66 @@ var db = map[string]string{ "Sam": "567", } -func main() { - geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( +func createGroup() *geecache.Group { + return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { - log.Println("[group scores] search key", key) + log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) })) +} - addr := "localhost:9999" +func startCacheServer(addr string, addrs []string, gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) + peers.Set(addrs...) + gee.RegisterPeers(peers) log.Println("geecache is running at", addr) - log.Fatal(http.ListenAndServe(addr, peers)) + log.Fatal(http.ListenAndServe(addr[7:], peers)) +} + +func startAPIServer(apiAddr string, gee *geecache.Group) { + http.Handle("/api", http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + key := r.URL.Query().Get("key") + view, err := gee.Get(key) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(view.ByteSlice()) + + })) + log.Println("fontend server is running at", apiAddr) + log.Fatal(http.ListenAndServe(apiAddr[7:], nil)) + +} + +func main() { + var port int + var api bool + flag.IntVar(&port, "port", 8001, "Geecache server port") + flag.BoolVar(&api, "api", false, "Start a api server?") + flag.Parse() + + apiAddr := "http://localhost:9999" + addrMap := map[int]string{ + 8001: "http://localhost:8001", + 8002: "http://localhost:8002", + 8003: "http://localhost:8003", + } + + addrs := make([]string, 3) + + for _, v := range addrMap { + addrs = append(addrs, v) + } + + gee := createGroup() + if api { + go startAPIServer(apiAddr, gee) + } + startCacheServer(addrMap[port], []string(addrs), gee) } diff --git a/gee-cache/day5-multi-nodes/run.sh b/gee-cache/day5-multi-nodes/run.sh new file mode 100755 index 0000000..d19b4c8 --- /dev/null +++ b/gee-cache/day5-multi-nodes/run.sh @@ -0,0 +1,9 @@ +#!/bin/bash +trap "rm server;kill 0" EXIT + +go build -o server +./server -port=8001 & +./server -port=8002 & +./server -port=8003 -api=1 & + +wait \ No newline at end of file