Skip to content

Commit

Permalink
feat: add zrange & zrangeWithScores
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Aug 1, 2024
1 parent 92de265 commit 3d1f8f9
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 14 deletions.
48 changes: 43 additions & 5 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ var cmdTable []*Command = []*Command{
{"hset", hsetCommand, 3, true},
{"hget", hgetCommand, 2, false},
{"hdel", hdelCommand, 2, true},
{"hgetall", hgetallCommand, 1, false},
{"rpush", rpushCommand, 2, true},
{"lpush", lpushCommand, 2, true},
{"rpop", rpopCommand, 1, true},
{"lpop", lpopCommand, 1, true},
{"lrange", lrangeCommand, 3, false},
{"sadd", saddCommand, 2, true},
{"srem", sremCommand, 2, true},
{"spop", spopCommand, 1, true},
{"zadd", zaddCommand, 3, true},
{"zpopmin", zpopminCommand, 1, true},
{"zrange", zrangeCommand, 3, false},
{"ping", pingCommand, 0, false},
{"hgetall", hgetallCommand, 1, false},
{"lrange", lrangeCommand, 3, false},
{"flushdb", flushdbCommand, 0, true},
// TODO
{"mset", todoCommand, 0, false},
Expand Down Expand Up @@ -405,6 +406,43 @@ func zaddCommand(writer *RESPWriter, args []RESP) {
writer.WriteInteger(newFields)
}

func zrangeCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToStringUnsafe()
start, err := args[1].ToInt()
if err != nil {
writer.WriteError(err)
return
}
stop, err := args[2].ToInt()
if err != nil {
writer.WriteError(err)
return
}
zset, err := fetchZSet(key)
if err != nil {
writer.WriteError(err)
return
}
if stop == -1 {
stop = zset.Len()
}

withScores := len(args) == 4 && strings.EqualFold(args[3].ToStringUnsafe(), "WITHSCORES")
if withScores {
writer.WriteArrayHead((stop - start) * 2)
zset.Range(start, stop, func(key string, score float64) {
writer.WriteBulkString(key)
writer.WriteBulkString(strconv.Itoa(int(score)))
})

} else {
writer.WriteArrayHead(stop - start)
zset.Range(start, stop, func(key string, _ float64) {
writer.WriteBulkString(key)
})
}
}

func zpopminCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()
count := 1
Expand All @@ -417,7 +455,7 @@ func zpopminCommand(writer *RESPWriter, args []RESP) {
}
}

zset, err := fetchZSet(key, true)
zset, err := fetchZSet(key)
if err != nil {
writer.WriteError(err)
return
Expand All @@ -427,8 +465,8 @@ func zpopminCommand(writer *RESPWriter, args []RESP) {
writer.WriteArrayHead(size * 2)
for range size {
key, score := zset.PopMin()
writer.WriteBulk([]byte(key))
writer.WriteBulk([]byte(strconv.Itoa(int(score))))
writer.WriteBulkString(key)
writer.WriteBulkString(strconv.Itoa(int(score)))
}
}

Expand Down
54 changes: 45 additions & 9 deletions command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,23 +178,59 @@ func TestCommand(t *testing.T) {
})

t.Run("zset", func(t *testing.T) {
n, _ := rdb.ZAdd(ctx, "zsRank", redis.Z{Member: "player1"}).Result()
n, _ := rdb.ZAdd(ctx, "rank", redis.Z{Member: "player1"}).Result()
assert.Equal(n, int64(1))

n, _ = rdb.ZAdd(ctx, "zsRank",
n, _ = rdb.ZAdd(ctx, "rank",
redis.Z{Member: "player1", Score: 100},
redis.Z{Member: "player2", Score: 300},
redis.Z{Member: "player3", Score: 100}).Result()
assert.Equal(n, int64(2))

res, _ := rdb.ZPopMin(ctx, "zsRank", 2).Result()
assert.Equal(len(res), 2)
assert.Equal(res[0], redis.Z{Member: "player1", Score: 100})
assert.Equal(res[1], redis.Z{Member: "player3", Score: 100})
// zrange
{
members, _ := rdb.ZRange(ctx, "rank", 0, -1).Result()
assert.Equal(members, []string{"player1", "player3", "player2"})

members, _ = rdb.ZRange(ctx, "rank", 1, 3).Result()
assert.Equal(members, []string{"player3", "player2"})

res, _ = rdb.ZPopMin(ctx, "zsRank").Result()
assert.Equal(len(res), 1)
assert.Equal(res[0], redis.Z{Member: "player2", Score: 300})
members, _ = rdb.ZRange(ctx, "rank", 70, 60).Result()
assert.Nil(members)
}

// zrangeWithScores
{
res, _ := rdb.ZRangeWithScores(ctx, "rank", 0, -1).Result()
assert.Equal(res, []redis.Z{
{Member: "player1", Score: 100},
{Member: "player3", Score: 100},
{Member: "player2", Score: 300},
})

res, _ = rdb.ZRangeWithScores(ctx, "rank", 1, 3).Result()
assert.Equal(res, []redis.Z{
{Member: "player3", Score: 100},
{Member: "player2", Score: 300},
})

res, _ = rdb.ZRangeWithScores(ctx, "rank", 70, 60).Result()
assert.Nil(res)
}

// zpopmin
{
res, _ := rdb.ZPopMin(ctx, "rank", 2).Result()
assert.Equal(res, []redis.Z{
{Member: "player1", Score: 100},
{Member: "player3", Score: 100},
})

res, _ = rdb.ZPopMin(ctx, "rank").Result()
assert.Equal(res, []redis.Z{
{Member: "player2", Score: 300},
})
}
})

t.Run("flushdb", func(t *testing.T) {
Expand Down
69 changes: 69 additions & 0 deletions internal/zset/benchmark/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"flag"
"fmt"
"runtime"
"runtime/debug"
"time"

"github.com/influxdata/tdigest"
"github.com/xgzlucario/rotom/internal/zset"
)

var previousPause time.Duration

func gcPause() time.Duration {
runtime.GC()
var stats debug.GCStats
debug.ReadGCStats(&stats)
pause := stats.PauseTotal - previousPause
previousPause = stats.PauseTotal
return pause
}

func genKey(id int) string {
return fmt.Sprintf("%08x", id)
}

func main() {
c := ""
entries := 0
flag.StringVar(&c, "zset", "zset", "zset to bench.")
flag.IntVar(&entries, "entries", 2000*10000, "number of entries to test.")
flag.Parse()

fmt.Println(c)
fmt.Println("entries:", entries)

debug.SetGCPercent(10)
start := time.Now()
td := tdigest.New()

switch c {
case "zset":
m := zset.NewZSet()
for i := 0; i < entries; i++ {
key := genKey(i)
start := time.Now()
m.Set(key, float64(i))
td.Add(float64(time.Since(start)), 1)
}
}
cost := time.Since(start)

var mem runtime.MemStats
var stat debug.GCStats

runtime.ReadMemStats(&mem)
debug.ReadGCStats(&stat)

fmt.Println("gcsys:", mem.GCSys/1024/1024, "mb")
fmt.Println("heap inuse:", mem.HeapInuse/1024/1024, "mb")
fmt.Println("heap object:", mem.HeapObjects/1024, "k")
fmt.Println("gc:", stat.NumGC)
fmt.Println("pause:", gcPause())
fmt.Println("cost:", cost)
// Compute Quantiles
fmt.Println("999th:", time.Duration(td.Quantile(0.999)))
}
11 changes: 11 additions & 0 deletions internal/zset/zset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func (z *ZSet) PopMin() (key string, score float64) {
return
}

func (z *ZSet) Range(start, stop int, fn func(key string, score float64)) {
var index int
z.skl.ForEachIf(func(n node, s struct{}) bool {
if index >= start && index < stop {
fn(n.key, n.score)
}
index++
return true
})
}

func (z *ZSet) Len() int {
return z.m.Len()
}

0 comments on commit 3d1f8f9

Please sign in to comment.