From 7292c4ab946fb1abb63f04aa5b526f0f756d96bc Mon Sep 17 00:00:00 2001 From: gzdaijie Date: Tue, 4 Feb 2020 00:33:34 +0800 Subject: [PATCH] day5 multi node --- .../day2-single-node/geecache/geecache.go | 14 +- .../day3-http-server/geecache/geecache.go | 14 +- .../day4-consistent-hash/geecache/geecache.go | 14 +- .../day5-multi-nodes/geecache/byteview.go | 21 +++ gee-cache/day5-multi-nodes/geecache/cache.go | 35 +++++ .../geecache/consistenthash/consistenthash.go | 58 ++++++++ .../consistenthash/consistenthash_test.go | 43 ++++++ .../day5-multi-nodes/geecache/geecache.go | 109 ++++++++++++++ .../geecache/geecache_test.go | 48 +++++++ gee-cache/day5-multi-nodes/geecache/go.mod | 3 + gee-cache/day5-multi-nodes/geecache/http.go | 134 ++++++++++++++++++ .../day5-multi-nodes/geecache/lru/lru.go | 79 +++++++++++ .../day5-multi-nodes/geecache/lru/lru_test.go | 55 +++++++ gee-cache/day5-multi-nodes/geecache/peers.go | 40 ++++++ gee-cache/day5-multi-nodes/go.mod | 7 + gee-cache/day5-multi-nodes/main.go | 30 ++++ 16 files changed, 692 insertions(+), 12 deletions(-) create mode 100644 gee-cache/day5-multi-nodes/geecache/byteview.go create mode 100644 gee-cache/day5-multi-nodes/geecache/cache.go create mode 100644 gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash.go create mode 100644 gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash_test.go create mode 100644 gee-cache/day5-multi-nodes/geecache/geecache.go create mode 100644 gee-cache/day5-multi-nodes/geecache/geecache_test.go create mode 100644 gee-cache/day5-multi-nodes/geecache/go.mod create mode 100644 gee-cache/day5-multi-nodes/geecache/http.go create mode 100644 gee-cache/day5-multi-nodes/geecache/lru/lru.go create mode 100644 gee-cache/day5-multi-nodes/geecache/lru/lru_test.go create mode 100644 gee-cache/day5-multi-nodes/geecache/peers.go create mode 100644 gee-cache/day5-multi-nodes/go.mod create mode 100644 gee-cache/day5-multi-nodes/main.go diff --git a/gee-cache/day2-single-node/geecache/geecache.go b/gee-cache/day2-single-node/geecache/geecache.go index 4273256..c2c8333 100644 --- a/gee-cache/day2-single-node/geecache/geecache.go +++ b/gee-cache/day2-single-node/geecache/geecache.go @@ -68,12 +68,18 @@ func cloneBytes(b []byte) []byte { } func (g *Group) load(key string) (value ByteView, err error) { + return g.getLocally(key) +} + +func (g *Group) getLocally(key string) (ByteView, error) { bytes, err := g.getter.Get(key) - if err == nil { - value = ByteView{cloneBytes(bytes)} - g.populateCache(key, value) + if err != nil { + return ByteView{}, err + } - return + value := ByteView{b: cloneBytes(bytes)} + g.populateCache(key, value) + return value, nil } func (g *Group) populateCache(key string, value ByteView) { diff --git a/gee-cache/day3-http-server/geecache/geecache.go b/gee-cache/day3-http-server/geecache/geecache.go index 4273256..c2c8333 100644 --- a/gee-cache/day3-http-server/geecache/geecache.go +++ b/gee-cache/day3-http-server/geecache/geecache.go @@ -68,12 +68,18 @@ func cloneBytes(b []byte) []byte { } func (g *Group) load(key string) (value ByteView, err error) { + return g.getLocally(key) +} + +func (g *Group) getLocally(key string) (ByteView, error) { bytes, err := g.getter.Get(key) - if err == nil { - value = ByteView{cloneBytes(bytes)} - g.populateCache(key, value) + if err != nil { + return ByteView{}, err + } - return + value := ByteView{b: cloneBytes(bytes)} + g.populateCache(key, value) + return value, nil } func (g *Group) populateCache(key string, value ByteView) { diff --git a/gee-cache/day4-consistent-hash/geecache/geecache.go b/gee-cache/day4-consistent-hash/geecache/geecache.go index 4273256..c2c8333 100644 --- a/gee-cache/day4-consistent-hash/geecache/geecache.go +++ b/gee-cache/day4-consistent-hash/geecache/geecache.go @@ -68,12 +68,18 @@ func cloneBytes(b []byte) []byte { } func (g *Group) load(key string) (value ByteView, err error) { + return g.getLocally(key) +} + +func (g *Group) getLocally(key string) (ByteView, error) { bytes, err := g.getter.Get(key) - if err == nil { - value = ByteView{cloneBytes(bytes)} - g.populateCache(key, value) + if err != nil { + return ByteView{}, err + } - return + value := ByteView{b: cloneBytes(bytes)} + g.populateCache(key, value) + return value, nil } func (g *Group) populateCache(key string, value ByteView) { diff --git a/gee-cache/day5-multi-nodes/geecache/byteview.go b/gee-cache/day5-multi-nodes/geecache/byteview.go new file mode 100644 index 0000000..a51394f --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/byteview.go @@ -0,0 +1,21 @@ +package geecache + +// A ByteView holds an immutable view of bytes. +type ByteView struct { + b []byte +} + +// Len returns the view's length +func (v ByteView) Len() int { + return len(v.b) +} + +// ByteSlice returns a copy of the data as a byte slice. +func (v ByteView) ByteSlice() []byte { + return cloneBytes(v.b) +} + +// String returns the data as a string, making a copy if necessary. +func (v ByteView) String() string { + return string(v.b) +} diff --git a/gee-cache/day5-multi-nodes/geecache/cache.go b/gee-cache/day5-multi-nodes/geecache/cache.go new file mode 100644 index 0000000..703d033 --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/cache.go @@ -0,0 +1,35 @@ +package geecache + +import ( + "geecache/lru" + "sync" +) + +type cache struct { + mu sync.RWMutex + lru *lru.Cache + cacheBytes int64 +} + +func (c *cache) add(key string, value ByteView) { + c.mu.RLock() + defer c.mu.RUnlock() + if c.lru == nil { + c.lru = lru.New(c.cacheBytes, nil) + } + c.lru.Add(key, value) +} + +func (c *cache) get(key string) (value ByteView, ok bool) { + c.mu.RLock() + defer c.mu.RUnlock() + if c.lru == nil { + return + } + + if v, ok := c.lru.Get(key); ok { + return v.(ByteView), ok + } + + return +} diff --git a/gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash.go b/gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash.go new file mode 100644 index 0000000..c8c9082 --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash.go @@ -0,0 +1,58 @@ +package consistenthash + +import ( + "hash/crc32" + "sort" + "strconv" +) + +// Hash maps bytes to uint32 +type Hash func(data []byte) uint32 + +// Map constains all hashed keys +type Map struct { + hash Hash + replicas int + keys []int // Sorted + hashMap map[int]string +} + +// New creates a Map instance +func New(replicas int, fn Hash) *Map { + m := &Map{ + replicas: replicas, + hash: fn, + hashMap: make(map[int]string), + } + if m.hash == nil { + m.hash = crc32.ChecksumIEEE + } + return m +} + +// Add adds some keys to the hash. +func (m *Map) Add(keys ...string) { + for _, key := range keys { + for i := 0; i < m.replicas; i++ { + hash := int(m.hash([]byte(strconv.Itoa(i) + key))) + m.keys = append(m.keys, hash) + m.hashMap[hash] = key + } + } + sort.Ints(m.keys) +} + +// Get gets the closest item in the hash to the provided key. +func (m *Map) Get(key string) string { + if len(m.keys) == 0 { + return "" + } + + hash := int(m.hash([]byte(key))) + // Binary search for appropriate replica. + idx := sort.Search(len(m.keys), func(i int) bool { + return m.keys[i] >= hash + }) + + return m.hashMap[m.keys[idx%len(m.keys)]] +} diff --git a/gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash_test.go b/gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash_test.go new file mode 100644 index 0000000..34e1275 --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/consistenthash/consistenthash_test.go @@ -0,0 +1,43 @@ +package consistenthash + +import ( + "strconv" + "testing" +) + +func TestHashing(t *testing.T) { + hash := New(3, func(key []byte) uint32 { + i, _ := strconv.Atoi(string(key)) + return uint32(i) + }) + + // Given the above hash function, this will give replicas with "hashes": + // 2, 4, 6, 12, 14, 16, 22, 24, 26 + hash.Add("6", "4", "2") + + testCases := map[string]string{ + "2": "2", + "11": "2", + "23": "4", + "27": "2", + } + + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yielded %s", k, v) + } + } + + // Adds 8, 18, 28 + hash.Add("8") + + // 27 should now map to 8. + testCases["27"] = "8" + + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yielded %s", k, v) + } + } + +} diff --git a/gee-cache/day5-multi-nodes/geecache/geecache.go b/gee-cache/day5-multi-nodes/geecache/geecache.go new file mode 100644 index 0000000..db6e67a --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/geecache.go @@ -0,0 +1,109 @@ +package geecache + +import ( + "sync" +) + +// A Group is a cache namespace and associated data loaded spread over +type Group struct { + name string + getter Getter + mainCache cache + peers PeerPicker + peersOnce sync.Once +} + +// A Getter loads data for a key. +type Getter interface { + Get(key string) ([]byte, error) +} + +// A GetterFunc implements Getter with a function. +type GetterFunc func(key string) ([]byte, error) + +// Get implements Getter interface function +func (f GetterFunc) Get(key string) ([]byte, error) { + return f(key) +} + +var ( + mu sync.RWMutex + groups = make(map[string]*Group) +) + +// NewGroup create a new instance of Group +func NewGroup(name string, cacheBytes int64, getter Getter) *Group { + if getter == nil { + panic("nil Getter") + } + mu.Lock() + defer mu.Unlock() + g := &Group{ + name: name, + getter: getter, + mainCache: cache{cacheBytes: cacheBytes}, + } + groups[name] = g + return g +} + +// GetGroup returns the named group previously created with NewGroup, or +// nil if there's no such group. +func GetGroup(name string) *Group { + mu.RLock() + g := groups[name] + mu.RUnlock() + return g +} + +// Get value for a key from cache +func (g *Group) Get(key string) (ByteView, error) { + g.peersOnce.Do(func() { + g.peers = getPeers() + }) + if v, ok := g.mainCache.get(key); ok { + return v, nil + } + + return g.load(key) +} + +func cloneBytes(b []byte) []byte { + c := make([]byte, len(b)) + copy(c, b) + return c +} + +func (g *Group) load(key string) (value ByteView, err error) { + if peer, ok := g.peers.PickPeer(key); ok { + value, err = g.getFromPeer(peer, key) + if err == nil { + return value, nil + } + } + + return g.getLocally(key) +} + +func (g *Group) populateCache(key string, value ByteView) { + g.mainCache.add(key, value) +} + +func (g *Group) getLocally(key string) (ByteView, error) { + bytes, err := g.getter.Get(key) + if err != nil { + return ByteView{}, err + + } + value := ByteView{b: cloneBytes(bytes)} + g.populateCache(key, value) + return value, nil +} + +func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) { + bytes, err := peer.Get(g.name, key) + if err != nil { + return ByteView{}, err + } + return ByteView{b: bytes}, nil +} diff --git a/gee-cache/day5-multi-nodes/geecache/geecache_test.go b/gee-cache/day5-multi-nodes/geecache/geecache_test.go new file mode 100644 index 0000000..2bffb3f --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/geecache_test.go @@ -0,0 +1,48 @@ +package geecache + +import ( + "fmt" + "log" + "testing" +) + +var db = map[string]string{ + "Tom": "630", + "Jack": "589", + "Sam": "567", +} + +func TestGet(t *testing.T) { + gee := NewGroup("scores", 2<<10, GetterFunc( + func(key string) ([]byte, error) { + log.Println("[group scores] search key", key) + if v, ok := db[key]; ok { + return []byte(v), nil + } + return nil, fmt.Errorf("%s not exist", key) + })) + + for k, v := range db { + view, err := gee.Get(k) + if err != nil || view.String() != v { + t.Fatal("failed to get value of Tom") + } + } + + if view, err := gee.Get("unknown"); err == nil { + t.Fatalf("the value of unknow should be empty, but %s got", view) + } +} + +func TestGetGroup(t *testing.T) { + groupName := "scores" + NewGroup(groupName, 2<<10, GetterFunc( + func(key string) (bytes []byte, err error) { return })) + if group := GetGroup(groupName); group == nil || group.name != groupName { + t.Fatalf("group %s not exist", groupName) + } + + if group := GetGroup(groupName + "111"); group != nil { + t.Fatalf("expect nil, but %s got", group.name) + } +} diff --git a/gee-cache/day5-multi-nodes/geecache/go.mod b/gee-cache/day5-multi-nodes/geecache/go.mod new file mode 100644 index 0000000..f9d454e --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/go.mod @@ -0,0 +1,3 @@ +module geecache + +go 1.13 diff --git a/gee-cache/day5-multi-nodes/geecache/http.go b/gee-cache/day5-multi-nodes/geecache/http.go new file mode 100644 index 0000000..c108edf --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/http.go @@ -0,0 +1,134 @@ +package geecache + +import ( + "bytes" + "fmt" + "geecache/consistenthash" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" +) + +const ( + defaultBasePath = "/_geecache/" + defaultReplicas = 50 +) + +// HTTPPool implements PeerPicker for a pool of HTTP peers. +type HTTPPool struct { + // this peer's base URL, e.g. "https://example.net:8000" + self string + basePath string + mu sync.Mutex // guards peers and httpGetters + peers *consistenthash.Map + httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008" +} + +// NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker. +func NewHTTPPool(self string) *HTTPPool { + p := &HTTPPool{ + self: self, + basePath: defaultBasePath, + } + RegisterPeerPicker(func() PeerPicker { return p }) + return p +} + +// ServeHTTP handle all http requests +func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, p.basePath) { + panic("HTTPPool serving unexpected path: " + r.URL.Path) + } + log.Println("[geecache server]", r.Method, r.URL.Path) + // /// required + parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2) + if len(parts) != 2 { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + + groupName := parts[0] + key := parts[1] + + group := GetGroup(groupName) + if group == nil { + http.Error(w, "no such group: "+groupName, http.StatusNotFound) + return + } + + view, err := group.Get(key) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(view.ByteSlice()) +} + +// Set updates the pool's list of peers. +func (p *HTTPPool) Set(peers ...string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peers = consistenthash.New(defaultReplicas, nil) + p.peers.Add(peers...) + p.httpGetters = make(map[string]*httpGetter, len(peers)) + for _, peer := range peers { + p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath} + } +} + +// PickPeer picks a peer according to key +func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) { + p.mu.Lock() + defer p.mu.Unlock() + if peer := p.peers.Get(key); peer != "" && peer != p.self { + return p.httpGetters[peer], true + } + return nil, false +} + +var _ PeerPicker = (*HTTPPool)(nil) + +type httpGetter struct { + baseURL string +} + +var bufferPool = sync.Pool{ + New: func() interface{} { return new(bytes.Buffer) }, +} + +func (h *httpGetter) Get(group string, key string) ([]byte, error) { + u := fmt.Sprintf( + "%v%v/%v", + h.baseURL, + url.QueryEscape(group), + url.QueryEscape(key), + ) + res, err := http.Get(u) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("server returned: %v", res.Status) + } + + b := bufferPool.Get().(*bytes.Buffer) + b.Reset() + defer bufferPool.Put(b) + + _, err = io.Copy(b, res.Body) + + if err != nil { + return nil, fmt.Errorf("reading response body: %v", err) + } + + return b.Bytes(), nil +} + +var _ PeerGetter = (*httpGetter)(nil) diff --git a/gee-cache/day5-multi-nodes/geecache/lru/lru.go b/gee-cache/day5-multi-nodes/geecache/lru/lru.go new file mode 100644 index 0000000..81eee43 --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/lru/lru.go @@ -0,0 +1,79 @@ +package lru + +import "container/list" + +// Cache is a LRU cache. It is not safe for concurrent access. +type Cache struct { + maxBytes int64 + nbytes int64 + ll *list.List + cache map[string]*list.Element + // optional and executed when an entry is purged. + OnEvicted func(key string, value Value) +} + +type entry struct { + key string + value Value +} + +// Value use Len to count how many bytes it takes +type Value interface { + Len() int +} + +// New is the Constructor of Cache +func New(maxBytes int64, onEvicted func(string, Value)) *Cache { + return &Cache{ + maxBytes: maxBytes, + ll: list.New(), + cache: make(map[string]*list.Element), + OnEvicted: onEvicted, + } +} + +// Add adds a value to the cache. +func (c *Cache) Add(key string, value Value) { + if ele, ok := c.cache[key]; ok { + c.ll.MoveToFront(ele) + kv := ele.Value.(*entry) + kv.value = value + return + } + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + c.nbytes += int64(len(key)) + int64(value.Len()) + + for c.maxBytes != 0 && c.maxBytes < c.nbytes { + c.RemoveOldest() + } +} + +// Get look ups a key's value +func (c *Cache) Get(key string) (value Value, ok bool) { + if ele, ok := c.cache[key]; ok { + c.ll.MoveToFront(ele) + kv := ele.Value.(*entry) + return kv.value, true + } + return +} + +// RemoveOldest removes the oldest item +func (c *Cache) RemoveOldest() { + ele := c.ll.Back() + if ele != nil { + c.ll.Remove(ele) + kv := ele.Value.(*entry) + delete(c.cache, kv.key) + c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len()) + if c.OnEvicted != nil { + c.OnEvicted(kv.key, kv.value) + } + } +} + +// Len the number of cache entries +func (c *Cache) Len() int { + return c.ll.Len() +} diff --git a/gee-cache/day5-multi-nodes/geecache/lru/lru_test.go b/gee-cache/day5-multi-nodes/geecache/lru/lru_test.go new file mode 100644 index 0000000..7308322 --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/lru/lru_test.go @@ -0,0 +1,55 @@ +package lru + +import ( + "reflect" + "testing" +) + +type String string + +func (d String) Len() int { + return len(d) +} + +func TestGet(t *testing.T) { + lru := New(int64(0), nil) + lru.Add("key1", String("1234")) + if v, ok := lru.Get("key1"); !ok || string(v.(String)) != "1234" { + t.Fatalf("cache hit key1=1234 failed") + } + if _, ok := lru.Get("key2"); ok { + t.Fatalf("cache miss key2 failed") + } +} + +func TestRemoveoldest(t *testing.T) { + k1, k2, k3 := "key1", "key2", "k3" + v1, v2, v3 := "value1", "value2", "v3" + cap := len(k1 + k2 + v1 + v2) + lru := New(int64(cap), nil) + lru.Add(k1, String(v1)) + lru.Add(k2, String(v2)) + lru.Add(k3, String(v3)) + + if _, ok := lru.Get("key1"); ok || lru.Len() != 2 { + t.Fatalf("Removeoldest key1 failed") + } +} + +func TestOnEvicted(t *testing.T) { + keys := make([]string, 0) + callback := func(key string, value Value) { + keys = append(keys, key) + } + lru := New(int64(10), callback) + lru.Add("key1", String("123456")) + lru.Add("k2", String("k2")) + lru.Add("k3", String("k3")) + lru.Add("k4", String("k4")) + + expect := []string{"key1", "k2"} + + if !reflect.DeepEqual(expect, keys) { + t.Fatalf("Call OnEvicted failed, expect keys equals to %s", expect) + } +} diff --git a/gee-cache/day5-multi-nodes/geecache/peers.go b/gee-cache/day5-multi-nodes/geecache/peers.go new file mode 100644 index 0000000..cfc39f7 --- /dev/null +++ b/gee-cache/day5-multi-nodes/geecache/peers.go @@ -0,0 +1,40 @@ +package geecache + +// PeerPicker is the interface that must be implemented to locate +// the peer that owns a specific key. +type PeerPicker interface { + PickPeer(key string) (peer PeerGetter, ok bool) +} + +// PeerGetter is the interface that must be implemented by a peer. +type PeerGetter interface { + Get(group string, key string) ([]byte, error) +} + +// NoPeers is an implementation of PeerPicker that never finds a peer. +type NoPeers struct{} + +// PickPeer return nothing +func (NoPeers) PickPeer(key string) (peer PeerGetter, ok bool) { return } + +var portPicker func() PeerPicker + +// RegisterPeerPicker registers the peer initialization function. +// It is called once, when the first group is created. +func RegisterPeerPicker(fn func() PeerPicker) { + if portPicker != nil { + panic("RegisterPeerPicker called more than once") + } + portPicker = fn +} + +func getPeers() PeerPicker { + if portPicker == nil { + return NoPeers{} + } + pk := portPicker() + if pk == nil { + pk = NoPeers{} + } + return pk +} diff --git a/gee-cache/day5-multi-nodes/go.mod b/gee-cache/day5-multi-nodes/go.mod new file mode 100644 index 0000000..d0fd3ba --- /dev/null +++ b/gee-cache/day5-multi-nodes/go.mod @@ -0,0 +1,7 @@ +module example + +go 1.13 + +require geecache v0.0.0 + +replace geecache => ./geecache diff --git a/gee-cache/day5-multi-nodes/main.go b/gee-cache/day5-multi-nodes/main.go new file mode 100644 index 0000000..570a90a --- /dev/null +++ b/gee-cache/day5-multi-nodes/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "geecache" + "log" + "net/http" +) + +var db = map[string]string{ + "Tom": "630", + "Jack": "589", + "Sam": "567", +} + +func main() { + geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( + func(key string) ([]byte, error) { + log.Println("[group scores] search key", key) + if v, ok := db[key]; ok { + return []byte(v), nil + } + return nil, fmt.Errorf("%s not exist", key) + })) + + addr := "localhost:9999" + peers := geecache.NewHTTPPool(addr) + log.Println("geecache is running at", addr) + log.Fatal(http.ListenAndServe(addr, peers)) +}