Skip to content

Commit

Permalink
feat: client querybuf supports dynamic growup
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Aug 2, 2024
1 parent f7dca53 commit 4de4c40
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 21 deletions.
1 change: 1 addition & 0 deletions aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
)

// Aof manages an append-only file system for storing data.
Expand Down
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var cmdTable []*Command = []*Command{
{"srem", sremCommand, 2, true},
{"spop", spopCommand, 1, true},
{"zadd", zaddCommand, 3, true},
{"zrem", zremCommand, 1, true},
{"zrem", zremCommand, 2, true},
{"zrank", zrankCommand, 2, false},
{"zpopmin", zpopminCommand, 1, true},
{"zrange", zrangeCommand, 3, false},
Expand Down
49 changes: 45 additions & 4 deletions command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,27 @@ func TestCommand(t *testing.T) {
res, _ = rdb.HDel(ctx, "map", "k1", "k2", "k3", "k99").Result()
assert.Equal(res, int64(3))

// error
// error hset
_, err := rdb.HSet(ctx, "map").Result()
assert.Equal(err.Error(), errInvalidArguments.Error())

_, err = rdb.HSet(ctx, "map", "k1", "v1", "k2").Result()
assert.Equal(err.Error(), errInvalidArguments.Error())

// err wrong type
rdb.Set(ctx, "key", "value", 0)

_, err = rdb.HGet(ctx, "key", "field1").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.HSet(ctx, "key", "field1", "value1").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.HDel(ctx, "key", "field1").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.HGetAll(ctx, "key").Result()
assert.Equal(err.Error(), errWrongType.Error())
})

t.Run("list", func(t *testing.T) {
Expand Down Expand Up @@ -170,6 +185,23 @@ func TestCommand(t *testing.T) {
assert.Equal(err, redis.Nil)
}

// error wrong type
rdb.Set(ctx, "key", "value", 0)

_, err = rdb.LPush(ctx, "key", "1").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.RPush(ctx, "key", "1").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.LPop(ctx, "key").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.RPop(ctx, "key").Result()
assert.Equal(err.Error(), errWrongType.Error())

_, err = rdb.LRange(ctx, "key", 0, -1).Result()
assert.Equal(err.Error(), errWrongType.Error())
})

t.Run("set", func(t *testing.T) {
Expand Down Expand Up @@ -303,9 +335,9 @@ func TestCommand(t *testing.T) {
wg.Wait()
})

t.Run("largeBody", func(t *testing.T) {
body := make([]byte, 1024*1024)
_, err := rdb.Set(ctx, "large", body, 0).Result()
t.Run("bigKey", func(t *testing.T) {
body := make([]byte, MAX_QUERY_DATA_LEN)
_, err := rdb.Set(ctx, "bigKey", body, 0).Result()
assert.NotNil(err)
})

Expand Down Expand Up @@ -340,3 +372,12 @@ func TestConfig(t *testing.T) {
_, err = LoadConfig("go.mod")
assert.NotNil(err)
}

func TestReadableSize(t *testing.T) {
assert := assert.New(t)

assert.Equal(readableSize(50), "50B")
assert.Equal(readableSize(50*KB), "50.0KB")
assert.Equal(readableSize(50*MB), "50.0MB")
assert.Equal(readableSize(50*GB), "50.0GB")
}
53 changes: 37 additions & 16 deletions rotom.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"io"
"runtime"
"runtime/debug"
Expand All @@ -12,9 +13,10 @@ import (
)

const (
READ_BUF_SIZE = 16 * KB
WRITE_BUF_SIZE = 4 * KB
MAX_READER_SIZE = 4 * KB
QUERY_BUF_SIZE = 8 * KB
WRITE_BUF_SIZE = 8 * KB

MAX_QUERY_DATA_LEN = 128 * MB
)

type (
Expand Down Expand Up @@ -90,7 +92,7 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) {
client := &Client{
fd: cfd,
replyWriter: NewWriter(WRITE_BUF_SIZE),
queryBuf: make([]byte, READ_BUF_SIZE),
queryBuf: make([]byte, QUERY_BUF_SIZE),
argsBuf: make([]RESP, 8),
}

Expand All @@ -100,29 +102,36 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) {

func ReadQueryFromClient(loop *AeLoop, fd int, extra interface{}) {
client := extra.(*Client)
readSize := 0

// grow query buffer
if len(client.queryBuf)-client.queryLen < MAX_READER_SIZE {
client.queryBuf = append(client.queryBuf, make([]byte, MAX_READER_SIZE)...)
}

READ:
n, err := Read(fd, client.queryBuf[client.queryLen:])
if err != nil {
log.Error().Msgf("client %v read err: %v", fd, err)
freeClient(client)
return
}
if n == MAX_READER_SIZE {
log.Error().Msgf("client %d read query too large, now free", fd)
readSize += n
client.queryLen += n

if readSize == 0 {
freeClient(client)
return
}
if n == 0 {

if client.queryLen > MAX_QUERY_DATA_LEN {
log.Error().Msgf("client %d read query data too large, now free", fd)
freeClient(client)
return
}

client.queryLen += n
// queryBuf need grow up
if client.queryLen == len(client.queryBuf) {
client.queryBuf = append(client.queryBuf, make([]byte, client.queryLen)...)
log.Warn().Msgf("client %d queryBuf grow up to size %s", fd, readableSize(len(client.queryBuf)))
goto READ
}

ProcessQueryBuf(client)
}

Expand Down Expand Up @@ -230,9 +239,9 @@ func SysMonitor(loop *AeLoop, id int, extra interface{}) {
debug.ReadGCStats(&stat)

log.Info().
Uint64("gcsys", mem.GCSys).
Uint64("heapInuse", mem.HeapInuse).
Uint64("heapObjects", mem.HeapObjects).
Str("gcsys", readableSize(mem.GCSys)).
Str("heapInuse", readableSize(mem.HeapInuse)).
Str("heapObjects", fmt.Sprintf("%.1fk", float64(mem.HeapObjects)/1e3)).
Int64("gc", stat.NumGC).
Msgf("[SYS]")

Expand All @@ -245,3 +254,15 @@ func SysMonitor(loop *AeLoop, id int, extra interface{}) {
}
server.outOfMemory = int(mem.HeapAlloc) > server.config.MaxMemory
}

func readableSize[T int | uint64](sz T) string {
switch {
case sz >= GB:
return fmt.Sprintf("%.1fGB", float64(sz)/float64(GB))
case sz >= MB:
return fmt.Sprintf("%.1fMB", float64(sz)/float64(MB))
case sz >= KB:
return fmt.Sprintf("%.1fKB", float64(sz)/float64(KB))
}
return fmt.Sprintf("%dB", sz)
}

0 comments on commit 4de4c40

Please sign in to comment.