Skip to content

Commit

Permalink
feat: add ttl related api for dict
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jul 25, 2024
1 parent 00d8c16 commit e993a1b
Show file tree
Hide file tree
Showing 17 changed files with 130 additions and 163 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ AeLoop(Async Event Loop) 是 Redis 的核心异步事件驱动机制,主要有

### 数据结构

rotom 在数据结构上做了许多优化,当 hash 和 set 较小时,使用空间紧凑的 `zipmap``zipset` 以优化内存效率,并在适时使用 `lz4` 压缩算法压缩较冷数据,以进一步节省内存。

其中 `zipmap``zipset` 以及 `quicklist` 都基于 `listpack`, 这是 Redis 7.0+ 提出的新型压缩列表,支持正序及逆序遍历。
rotom 在数据结构上做了许多优化,当 hash 和 set 较小时,使用空间紧凑的 `zipmap``zipset` 以优化内存效率。它们都基于 `listpack`, 这是 Redis 5.0+ 提出的新型压缩列表,支持正序及逆序遍历。

### 计划

Expand Down
12 changes: 6 additions & 6 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func setCommand(writer *RESPWriter, args []RESP) {
func incrCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()

object, ok := db.dict.Get(key)
if !ok {
object, ttl := db.dict.Get(key)
if ttl == dict.KEY_NOT_EXIST {
db.dict.Set(key, dict.TypeInt, 1)
writer.WriteInteger(1)
return
Expand Down Expand Up @@ -128,8 +128,8 @@ func incrCommand(writer *RESPWriter, args []RESP) {
func getCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToStringUnsafe()

object, ok := db.dict.Get(key)
if !ok {
object, ttl := db.dict.Get(key)
if ttl == dict.KEY_NOT_EXIST {
writer.WriteNull()
return
}
Expand Down Expand Up @@ -431,8 +431,8 @@ func fetchZSet(key string, setnx ...bool) (ZSet, error) {
}

func fetch[T any](key string, typ dict.Type, new func() T, setnx ...bool) (v T, err error) {
object, ok := db.dict.Get(key)
if ok {
object, ttl := db.dict.Get(key)
if ttl != dict.KEY_NOT_EXIST {
if object.Type() != typ {
return v, errWrongType
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964
github.com/deckarep/golang-set/v2 v2.6.0
github.com/influxdata/tdigest v0.0.1
github.com/pierrec/lz4/v4 v4.1.21
github.com/redis/go-redis/v9 v9.5.2
github.com/rs/zerolog v1.33.0
github.com/sakeven/RbTree v1.1.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
83 changes: 55 additions & 28 deletions internal/dict/dict.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/cockroachdb/swiss"
)

const (
TTL_DEFAULT = -1
KEY_NOT_EXIST = -2
)

var (
_sec atomic.Uint32
_nsec atomic.Int64
Expand Down Expand Up @@ -36,32 +41,26 @@ func New() *Dict {
}
}

func (dict *Dict) Get(key string) (*Object, bool) {
func (dict *Dict) Get(key string) (*Object, int) {
object, ok := dict.data.Get(key)
if !ok {
return nil, false
// key not exist
return nil, KEY_NOT_EXIST
}

object.lastAccessd = _sec.Load()
if object.hasTTL {
nsecTTL, ok := dict.expire.Get(key)
if !ok || nsecTTL < _nsec.Load() {
// expired
nsec, _ := dict.expire.Get(key)
// key expired
if nsec < _nsec.Load() {
dict.data.Delete(key)
dict.expire.Delete(key)
return nil, false
return nil, KEY_NOT_EXIST
}
return object, nsec2duration(nsec)
}

switch object.typ {
case TypeZipMapC, TypeZipSetC:
object.data.(Compressor).Decompress()
object.typ -= 1
}

// update access time
object.lastAccessd = _sec.Load()

return object, true
return object, TTL_DEFAULT
}

func (dict *Dict) Set(key string, typ Type, data any) {
Expand All @@ -72,28 +71,56 @@ func (dict *Dict) Set(key string, typ Type, data any) {
})
}

func (dict *Dict) Remove(key string) bool {
_, ok := dict.data.Get(key)
dict.data.Delete(key)
dict.expire.Delete(key)
return ok
func (dict *Dict) SetWithTTL(key string, typ Type, data any, ttl int64) {
dict.data.Put(key, &Object{
typ: typ,
lastAccessd: _sec.Load(),
data: data,
hasTTL: true,
})
dict.expire.Put(key, ttl)
}

func (dict *Dict) SetTTL(key string, expiration int64) bool {
func (dict *Dict) Delete(key string) bool {
object, ok := dict.data.Get(key)
if !ok {
return false
}
object.hasTTL = true
dict.expire.Put(key, expiration)
dict.data.Delete(key)
if object.hasTTL {
dict.expire.Delete(key)
}
return true
}

// SetTTL set expire time for key.
// return `0` if key not exist or expired.
// return `1` if set successed.
func (dict *Dict) SetTTL(key string, ttl int64) int {
object, ok := dict.data.Get(key)
if !ok {
// key not exist
return 0
}
if object.hasTTL {
nsec, _ := dict.expire.Get(key)
// key expired
if nsec < _nsec.Load() {
dict.data.Delete(key)
dict.expire.Delete(key)
return 0
}
}
// set ttl
object.hasTTL = true
dict.expire.Put(key, ttl)
return 1
}

func (dict *Dict) EvictExpired() {
nanosec := time.Now().UnixNano()
count := 0
dict.expire.All(func(key string, value int64) bool {
if nanosec > value {
var count int
dict.expire.All(func(key string, nsec int64) bool {
if _nsec.Load() > nsec {
dict.expire.Delete(key)
dict.data.Delete(key)
}
Expand Down
66 changes: 42 additions & 24 deletions internal/dict/dict_test.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,58 @@
package dict

import (
"fmt"
"math/rand/v2"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func genKV(i int) (string, []byte) {
k := fmt.Sprintf("%08x", i)
return k, []byte(k)
}

func TestDict(t *testing.T) {
assert := assert.New(t)
dict := New()

dict.Set("key1", TypeString, []byte("hello"))
object, ok := dict.Get("key1")
assert.True(ok)
assert.Equal(object.Data(), []byte("hello"))
assert.Equal(object.Type(), TypeString)
}
t.Run("set", func(t *testing.T) {
dict := New()
dict.Set("key", TypeString, []byte("hello"))

func TestDictMultiSet(t *testing.T) {
assert := assert.New(t)
dict := New()
object, ttl := dict.Get("key")
assert.Equal(ttl, TTL_DEFAULT)
assert.Equal(object.Data(), []byte("hello"))
assert.Equal(object.Type(), TypeString)

object, ttl = dict.Get("none")
assert.Nil(object)
assert.Equal(ttl, KEY_NOT_EXIST)
})

t.Run("setTTL", func(t *testing.T) {
dict := New()
dict.SetWithTTL("key", TypeString, []byte("hello"), time.Now().Add(time.Minute).UnixNano())

for i := 0; i < 10000; i++ {
key, value := genKV(rand.Int())
dict.Set(key, TypeString, value)
object, ttl := dict.Get("key")
assert.Equal(ttl, 60)
assert.Equal(object.Data(), []byte("hello"))
assert.Equal(object.Type(), TypeString)

object, ok := dict.Get(key)
ttl = dict.SetTTL("key", time.Now().Add(-time.Second).UnixNano())
assert.Equal(ttl, 1)

ttl = dict.SetTTL("not-exist", TTL_DEFAULT)
assert.Equal(ttl, 0)

// expired
object, ttl = dict.Get("key")
assert.Equal(ttl, KEY_NOT_EXIST)
assert.Nil(object)
})

t.Run("delete", func(t *testing.T) {
dict := New()
dict.Set("key", TypeString, []byte("hello"))

ok := dict.Delete("key")
assert.True(ok)
assert.Equal(object.typ, TypeString)
assert.Equal(object.data.([]byte), value)
}

ok = dict.Delete("none")
assert.False(ok)
})
}
8 changes: 6 additions & 2 deletions internal/dict/object.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dict

import "time"

// Type defines all rotom data types.
type Type byte

Expand All @@ -8,10 +10,8 @@ const (
TypeInt
TypeMap
TypeZipMap
TypeZipMapC // compressed zipmap
TypeSet
TypeZipSet
TypeZipSetC // compressed zipset
TypeList
TypeZSet
)
Expand All @@ -34,3 +34,7 @@ func (o *Object) Type() Type { return o.typ }
func (o *Object) Data() any { return o.data }

func (o *Object) SetData(data any) { o.data = data }

func nsec2duration(nsec int64) (second int) {
return int(nsec-_nsec.Load()) / int(time.Second)
}
11 changes: 0 additions & 11 deletions internal/hash/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,6 @@ func main() {
}
m[i] = hm
}

case "zipmap-compressed":
for i := 0; i < 10000; i++ {
hm := hash.NewZipMap()
for i := 0; i < 512; i++ {
k, v := genKV(i)
hm.Set(k, v)
}
hm.Compress()
m[i] = hm
}
}
cost := time.Since(start)

Expand Down
3 changes: 0 additions & 3 deletions internal/hash/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func TestToMap(t *testing.T) {
m.Set("key2", []byte("value2"))
m.Set("key3", []byte("value3"))

m.Compress()
m.Decompress()

nm := m.ToMap()

// scan
Expand Down
3 changes: 0 additions & 3 deletions internal/hash/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ func TestToSet(t *testing.T) {
m.Add("key2")
m.Add("key3")

m.Compress()
m.Decompress()

nm := m.ToSet()

// scan
Expand Down
4 changes: 0 additions & 4 deletions internal/hash/zipmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ func (zm *ZipMap) ToMap() *Map {

func (zm *ZipMap) Len() int { return zm.m.Size() }

func (zm *ZipMap) Compress() { zm.m.Compress() }

func (zm *ZipMap) Decompress() { zm.m.Decompress() }

func b2s(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
4 changes: 0 additions & 4 deletions internal/hash/zipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ func (zs *ZipSet) Pop() (string, bool) { return zs.m.RPop() }

func (zs *ZipSet) Len() int { return zs.m.Size() }

func (zs *ZipSet) Compress() { zs.m.Compress() }

func (zs *ZipSet) Decompress() { zs.m.Decompress() }

func (zs *ZipSet) ToSet() *Set {
s := NewSet()
zs.Scan(func(key string) {
Expand Down
14 changes: 0 additions & 14 deletions internal/list/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,3 @@ func BenchmarkList(b *testing.B) {
}
})
}

func BenchmarkCompress(b *testing.B) {
b.Run("compress", func(b *testing.B) {
lp := NewListPack()
for i := 0; i < 256; i++ {
lp.RPush("Hello", "World")
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
lp.Compress()
lp.Decompress()
}
})
}
Loading

0 comments on commit e993a1b

Please sign in to comment.