diff --git a/README.md b/README.md index 474a315..67292e4 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,7 @@ AeLoop(Async Event Loop) 是 Redis 的核心异步事件驱动机制,主要有 ### 数据结构 -rotom 在数据结构上做了许多优化,当 hash 和 set 较小时,使用空间紧凑的 `zipmap` 和 `zipset` 以优化内存效率,并在适时使用 `lz4` 压缩算法压缩较冷数据,以进一步节省内存。 - -其中 `zipmap` 和 `zipset` 以及 `quicklist` 都基于 `listpack`, 这是 Redis 7.0+ 提出的新型压缩列表,支持正序及逆序遍历。 +rotom 在数据结构上做了许多优化,当 hash 和 set 较小时,使用空间紧凑的 `zipmap` 和 `zipset` 以优化内存效率。它们都基于 `listpack`, 这是 Redis 5.0+ 提出的新型压缩列表,支持正序及逆序遍历。 ### 计划 diff --git a/command.go b/command.go index a8861b3..ab6cc4a 100644 --- a/command.go +++ b/command.go @@ -96,8 +96,8 @@ func setCommand(writer *RESPWriter, args []RESP) { func incrCommand(writer *RESPWriter, args []RESP) { key := args[0].ToString() - object, ok := db.dict.Get(key) - if !ok { + object, ttl := db.dict.Get(key) + if ttl == dict.KEY_NOT_EXIST { db.dict.Set(key, dict.TypeInt, 1) writer.WriteInteger(1) return @@ -128,8 +128,8 @@ func incrCommand(writer *RESPWriter, args []RESP) { func getCommand(writer *RESPWriter, args []RESP) { key := args[0].ToStringUnsafe() - object, ok := db.dict.Get(key) - if !ok { + object, ttl := db.dict.Get(key) + if ttl == dict.KEY_NOT_EXIST { writer.WriteNull() return } @@ -431,8 +431,8 @@ func fetchZSet(key string, setnx ...bool) (ZSet, error) { } func fetch[T any](key string, typ dict.Type, new func() T, setnx ...bool) (v T, err error) { - object, ok := db.dict.Get(key) - if ok { + object, ttl := db.dict.Get(key) + if ttl != dict.KEY_NOT_EXIST { if object.Type() != typ { return v, errWrongType } diff --git a/go.mod b/go.mod index 2d946b2..b2c4ab5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964 github.com/deckarep/golang-set/v2 v2.6.0 github.com/influxdata/tdigest v0.0.1 - github.com/pierrec/lz4/v4 v4.1.21 github.com/redis/go-redis/v9 v9.5.2 github.com/rs/zerolog v1.33.0 github.com/sakeven/RbTree v1.1.1 diff --git a/go.sum b/go.sum index 1d87431..09b860f 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/internal/dict/dict.go b/internal/dict/dict.go index 5a98e9e..ba43533 100644 --- a/internal/dict/dict.go +++ b/internal/dict/dict.go @@ -7,6 +7,11 @@ import ( "github.com/cockroachdb/swiss" ) +const ( + TTL_DEFAULT = -1 + KEY_NOT_EXIST = -2 +) + var ( _sec atomic.Uint32 _nsec atomic.Int64 @@ -36,32 +41,26 @@ func New() *Dict { } } -func (dict *Dict) Get(key string) (*Object, bool) { +func (dict *Dict) Get(key string) (*Object, int) { object, ok := dict.data.Get(key) if !ok { - return nil, false + // key not exist + return nil, KEY_NOT_EXIST } + object.lastAccessd = _sec.Load() if object.hasTTL { - nsecTTL, ok := dict.expire.Get(key) - if !ok || nsecTTL < _nsec.Load() { - // expired + nsec, _ := dict.expire.Get(key) + // key expired + if nsec < _nsec.Load() { dict.data.Delete(key) dict.expire.Delete(key) - return nil, false + return nil, KEY_NOT_EXIST } + return object, nsec2duration(nsec) } - switch object.typ { - case TypeZipMapC, TypeZipSetC: - object.data.(Compressor).Decompress() - object.typ -= 1 - } - - // update access time - object.lastAccessd = _sec.Load() - - return object, true + return object, TTL_DEFAULT } func (dict *Dict) Set(key string, typ Type, data any) { @@ -72,28 +71,56 @@ func (dict *Dict) Set(key string, typ Type, data any) { }) } -func (dict *Dict) Remove(key string) bool { - _, ok := dict.data.Get(key) - dict.data.Delete(key) - dict.expire.Delete(key) - return ok +func (dict *Dict) SetWithTTL(key string, typ Type, data any, ttl int64) { + dict.data.Put(key, &Object{ + typ: typ, + lastAccessd: _sec.Load(), + data: data, + hasTTL: true, + }) + dict.expire.Put(key, ttl) } -func (dict *Dict) SetTTL(key string, expiration int64) bool { +func (dict *Dict) Delete(key string) bool { object, ok := dict.data.Get(key) if !ok { return false } - object.hasTTL = true - dict.expire.Put(key, expiration) + dict.data.Delete(key) + if object.hasTTL { + dict.expire.Delete(key) + } return true } +// SetTTL set expire time for key. +// return `0` if key not exist or expired. +// return `1` if set successed. +func (dict *Dict) SetTTL(key string, ttl int64) int { + object, ok := dict.data.Get(key) + if !ok { + // key not exist + return 0 + } + if object.hasTTL { + nsec, _ := dict.expire.Get(key) + // key expired + if nsec < _nsec.Load() { + dict.data.Delete(key) + dict.expire.Delete(key) + return 0 + } + } + // set ttl + object.hasTTL = true + dict.expire.Put(key, ttl) + return 1 +} + func (dict *Dict) EvictExpired() { - nanosec := time.Now().UnixNano() - count := 0 - dict.expire.All(func(key string, value int64) bool { - if nanosec > value { + var count int + dict.expire.All(func(key string, nsec int64) bool { + if _nsec.Load() > nsec { dict.expire.Delete(key) dict.data.Delete(key) } diff --git a/internal/dict/dict_test.go b/internal/dict/dict_test.go index d0f9c95..b49c984 100644 --- a/internal/dict/dict_test.go +++ b/internal/dict/dict_test.go @@ -1,40 +1,58 @@ package dict import ( - "fmt" - "math/rand/v2" "testing" + "time" "github.com/stretchr/testify/assert" ) -func genKV(i int) (string, []byte) { - k := fmt.Sprintf("%08x", i) - return k, []byte(k) -} - func TestDict(t *testing.T) { assert := assert.New(t) - dict := New() - dict.Set("key1", TypeString, []byte("hello")) - object, ok := dict.Get("key1") - assert.True(ok) - assert.Equal(object.Data(), []byte("hello")) - assert.Equal(object.Type(), TypeString) -} + t.Run("set", func(t *testing.T) { + dict := New() + dict.Set("key", TypeString, []byte("hello")) -func TestDictMultiSet(t *testing.T) { - assert := assert.New(t) - dict := New() + object, ttl := dict.Get("key") + assert.Equal(ttl, TTL_DEFAULT) + assert.Equal(object.Data(), []byte("hello")) + assert.Equal(object.Type(), TypeString) + + object, ttl = dict.Get("none") + assert.Nil(object) + assert.Equal(ttl, KEY_NOT_EXIST) + }) + + t.Run("setTTL", func(t *testing.T) { + dict := New() + dict.SetWithTTL("key", TypeString, []byte("hello"), time.Now().Add(time.Minute).UnixNano()) - for i := 0; i < 10000; i++ { - key, value := genKV(rand.Int()) - dict.Set(key, TypeString, value) + object, ttl := dict.Get("key") + assert.Equal(ttl, 60) + assert.Equal(object.Data(), []byte("hello")) + assert.Equal(object.Type(), TypeString) - object, ok := dict.Get(key) + ttl = dict.SetTTL("key", time.Now().Add(-time.Second).UnixNano()) + assert.Equal(ttl, 1) + + ttl = dict.SetTTL("not-exist", TTL_DEFAULT) + assert.Equal(ttl, 0) + + // expired + object, ttl = dict.Get("key") + assert.Equal(ttl, KEY_NOT_EXIST) + assert.Nil(object) + }) + + t.Run("delete", func(t *testing.T) { + dict := New() + dict.Set("key", TypeString, []byte("hello")) + + ok := dict.Delete("key") assert.True(ok) - assert.Equal(object.typ, TypeString) - assert.Equal(object.data.([]byte), value) - } + + ok = dict.Delete("none") + assert.False(ok) + }) } diff --git a/internal/dict/object.go b/internal/dict/object.go index 81c0213..765f512 100644 --- a/internal/dict/object.go +++ b/internal/dict/object.go @@ -1,5 +1,7 @@ package dict +import "time" + // Type defines all rotom data types. type Type byte @@ -8,10 +10,8 @@ const ( TypeInt TypeMap TypeZipMap - TypeZipMapC // compressed zipmap TypeSet TypeZipSet - TypeZipSetC // compressed zipset TypeList TypeZSet ) @@ -34,3 +34,7 @@ func (o *Object) Type() Type { return o.typ } func (o *Object) Data() any { return o.data } func (o *Object) SetData(data any) { o.data = data } + +func nsec2duration(nsec int64) (second int) { + return int(nsec-_nsec.Load()) / int(time.Second) +} diff --git a/internal/hash/benchmark/main.go b/internal/hash/benchmark/main.go index ac8f04f..10f9f79 100644 --- a/internal/hash/benchmark/main.go +++ b/internal/hash/benchmark/main.go @@ -55,17 +55,6 @@ func main() { } m[i] = hm } - - case "zipmap-compressed": - for i := 0; i < 10000; i++ { - hm := hash.NewZipMap() - for i := 0; i < 512; i++ { - k, v := genKV(i) - hm.Set(k, v) - } - hm.Compress() - m[i] = hm - } } cost := time.Since(start) diff --git a/internal/hash/map_test.go b/internal/hash/map_test.go index 740da45..1e0a59f 100644 --- a/internal/hash/map_test.go +++ b/internal/hash/map_test.go @@ -100,9 +100,6 @@ func TestToMap(t *testing.T) { m.Set("key2", []byte("value2")) m.Set("key3", []byte("value3")) - m.Compress() - m.Decompress() - nm := m.ToMap() // scan diff --git a/internal/hash/set_test.go b/internal/hash/set_test.go index 8a9c35d..e1015c1 100644 --- a/internal/hash/set_test.go +++ b/internal/hash/set_test.go @@ -77,9 +77,6 @@ func TestToSet(t *testing.T) { m.Add("key2") m.Add("key3") - m.Compress() - m.Decompress() - nm := m.ToSet() // scan diff --git a/internal/hash/zipmap.go b/internal/hash/zipmap.go index a7b8e11..fbbae94 100644 --- a/internal/hash/zipmap.go +++ b/internal/hash/zipmap.go @@ -112,10 +112,6 @@ func (zm *ZipMap) ToMap() *Map { func (zm *ZipMap) Len() int { return zm.m.Size() } -func (zm *ZipMap) Compress() { zm.m.Compress() } - -func (zm *ZipMap) Decompress() { zm.m.Decompress() } - func b2s(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } diff --git a/internal/hash/zipset.go b/internal/hash/zipset.go index e566fb4..90664dd 100644 --- a/internal/hash/zipset.go +++ b/internal/hash/zipset.go @@ -55,10 +55,6 @@ func (zs *ZipSet) Pop() (string, bool) { return zs.m.RPop() } func (zs *ZipSet) Len() int { return zs.m.Size() } -func (zs *ZipSet) Compress() { zs.m.Compress() } - -func (zs *ZipSet) Decompress() { zs.m.Decompress() } - func (zs *ZipSet) ToSet() *Set { s := NewSet() zs.Scan(func(key string) { diff --git a/internal/list/bench_test.go b/internal/list/bench_test.go index 70da9dd..18d03fb 100644 --- a/internal/list/bench_test.go +++ b/internal/list/bench_test.go @@ -46,17 +46,3 @@ func BenchmarkList(b *testing.B) { } }) } - -func BenchmarkCompress(b *testing.B) { - b.Run("compress", func(b *testing.B) { - lp := NewListPack() - for i := 0; i < 256; i++ { - lp.RPush("Hello", "World") - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - lp.Compress() - lp.Decompress() - } - }) -} diff --git a/internal/list/listpack.go b/internal/list/listpack.go index 95b1090..bb99aa3 100644 --- a/internal/list/listpack.go +++ b/internal/list/listpack.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "slices" - "github.com/pierrec/lz4/v4" "github.com/xgzlucario/rotom/internal/utils" ) @@ -14,8 +13,6 @@ const ( var ( bpool = utils.NewBufferPool() - - c lz4.Compressor ) // ListPack is a lists of strings serialization format on Redis. @@ -35,9 +32,8 @@ var ( Using this structure, it is fast to iterate from both sides. */ type ListPack struct { - srcLen uint32 // srcLen is data size before compressed. - size uint32 - data []byte + size uint32 + data []byte } func NewListPack() *ListPack { @@ -78,43 +74,12 @@ func (lp *ListPack) RPop() (val string, ok bool) { return } -func (lp *ListPack) Compress() { - if lp.srcLen > 0 { - return - } - if len(lp.data) == 0 { - return - } - lp.srcLen = uint32(len(lp.data)) - - dst := bpool.Get(lz4.CompressBlockBound(len(lp.data))) - n, _ := c.CompressBlock(lp.data, dst) - - bpool.Put(lp.data) - lp.data = dst[:n] -} - -func (lp *ListPack) Decompress() { - if lp.srcLen == 0 { - return - } - dst := bpool.Get(int(lp.srcLen)) - n, _ := lz4.UncompressBlock(lp.data, dst) - - bpool.Put(lp.data) - lp.data = dst[:n] - lp.srcLen = 0 -} - type lpIterator struct { *ListPack index int } func (lp *ListPack) Iterator() *lpIterator { - if lp.srcLen > 0 { - lp.Decompress() - } return &lpIterator{ListPack: lp} } diff --git a/internal/list/listpack_test.go b/internal/list/listpack_test.go index 5aae556..7e960f8 100644 --- a/internal/list/listpack_test.go +++ b/internal/list/listpack_test.go @@ -129,23 +129,4 @@ func TestListpack(t *testing.T) { it.SeekLast().ReplaceNext("a") assert.Equal(lp2list(lp), []string{"TTTTTT", "TEST2", "TEST3"}) }) - - t.Run("compress", func(t *testing.T) { - lp := NewListPack() - for i := 0; i < 100; i++ { - lp.RPush("A1", "B2", "C3") - } - - before := len(lp.data) - lp.Compress() - lp.Compress() - afterC := len(lp.data) - assert.Less(afterC, before) - - lp.Decompress() - lp.Decompress() - afterD := len(lp.data) - assert.Less(afterC, afterD) - assert.Equal(afterD, before) - }) } diff --git a/main.go b/main.go index f88426e..fe7e8db 100644 --- a/main.go +++ b/main.go @@ -59,7 +59,8 @@ func main() { // register main aeLoop event server.aeLoop.AddRead(server.fd, AcceptHandler, nil) server.aeLoop.AddTimeEvent(AE_NORMAL, 100, EvictExpired, nil) - server.aeLoop.AddTimeEvent(AE_NORMAL, 5000, CheckOutOfMemory, nil) + server.aeLoop.AddTimeEvent(AE_NORMAL, 5*1000, CheckOutOfMemory, nil) + server.aeLoop.AddTimeEvent(AE_NORMAL, 60*1000, SysMonitor, nil) if server.config.AppendOnly { server.aeLoop.AddTimeEvent(AE_NORMAL, 1000, SyncAOF, nil) } diff --git a/rotom.go b/rotom.go index 664c63f..362d876 100644 --- a/rotom.go +++ b/rotom.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "runtime" + "runtime/debug" "github.com/xgzlucario/rotom/internal/dict" "github.com/xgzlucario/rotom/internal/hash" @@ -94,7 +95,6 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) { argsBuf: make([]RESP, 8), } - log.Debug().Msgf("accept client, fd: %d", cfd) server.clients[cfd] = client loop.AddRead(cfd, ReadQueryFromClient, client) } @@ -238,3 +238,18 @@ func CheckOutOfMemory(loop *AeLoop, id int, extra interface{}) { runtime.ReadMemStats(&mem) server.outOfMemory = int(mem.HeapAlloc) > server.config.MaxMemory } + +func SysMonitor(loop *AeLoop, id int, extra interface{}) { + var mem runtime.MemStats + var stat debug.GCStats + + runtime.ReadMemStats(&mem) + debug.ReadGCStats(&stat) + + log.Info(). + Uint64("gcsys", mem.GCSys). + Uint64("heapInuse", mem.HeapInuse). + Uint64("heapObjects", mem.HeapObjects). + Int64("gc", stat.NumGC). + Msgf("[SYS]") +}