From 4de4c4030ae543566563b68cabbbf461884165cf Mon Sep 17 00:00:00 2001 From: xgzlucario <912156837@qq.com> Date: Sat, 3 Aug 2024 02:07:05 +0800 Subject: [PATCH] feat: client querybuf supports dynamic growup --- aof.go | 1 + command.go | 2 +- command_test.go | 49 +++++++++++++++++++++++++++++++++++++++++---- rotom.go | 53 ++++++++++++++++++++++++++++++++++--------------- 4 files changed, 84 insertions(+), 21 deletions(-) diff --git a/aof.go b/aof.go index 70f655f..220395f 100644 --- a/aof.go +++ b/aof.go @@ -11,6 +11,7 @@ import ( const ( KB = 1024 MB = 1024 * KB + GB = 1024 * MB ) // Aof manages an append-only file system for storing data. diff --git a/command.go b/command.go index 451fa8d..be12f77 100644 --- a/command.go +++ b/command.go @@ -50,7 +50,7 @@ var cmdTable []*Command = []*Command{ {"srem", sremCommand, 2, true}, {"spop", spopCommand, 1, true}, {"zadd", zaddCommand, 3, true}, - {"zrem", zremCommand, 1, true}, + {"zrem", zremCommand, 2, true}, {"zrank", zrankCommand, 2, false}, {"zpopmin", zpopminCommand, 1, true}, {"zrange", zrangeCommand, 3, false}, diff --git a/command_test.go b/command_test.go index 2372500..027ef3d 100644 --- a/command_test.go +++ b/command_test.go @@ -125,12 +125,27 @@ func TestCommand(t *testing.T) { res, _ = rdb.HDel(ctx, "map", "k1", "k2", "k3", "k99").Result() assert.Equal(res, int64(3)) - // error + // error hset _, err := rdb.HSet(ctx, "map").Result() assert.Equal(err.Error(), errInvalidArguments.Error()) _, err = rdb.HSet(ctx, "map", "k1", "v1", "k2").Result() assert.Equal(err.Error(), errInvalidArguments.Error()) + + // err wrong type + rdb.Set(ctx, "key", "value", 0) + + _, err = rdb.HGet(ctx, "key", "field1").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.HSet(ctx, "key", "field1", "value1").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.HDel(ctx, "key", "field1").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.HGetAll(ctx, "key").Result() + assert.Equal(err.Error(), errWrongType.Error()) }) t.Run("list", func(t *testing.T) { @@ -170,6 +185,23 @@ func TestCommand(t *testing.T) { assert.Equal(err, redis.Nil) } + // error wrong type + rdb.Set(ctx, "key", "value", 0) + + _, err = rdb.LPush(ctx, "key", "1").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.RPush(ctx, "key", "1").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.LPop(ctx, "key").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.RPop(ctx, "key").Result() + assert.Equal(err.Error(), errWrongType.Error()) + + _, err = rdb.LRange(ctx, "key", 0, -1).Result() + assert.Equal(err.Error(), errWrongType.Error()) }) t.Run("set", func(t *testing.T) { @@ -303,9 +335,9 @@ func TestCommand(t *testing.T) { wg.Wait() }) - t.Run("largeBody", func(t *testing.T) { - body := make([]byte, 1024*1024) - _, err := rdb.Set(ctx, "large", body, 0).Result() + t.Run("bigKey", func(t *testing.T) { + body := make([]byte, MAX_QUERY_DATA_LEN) + _, err := rdb.Set(ctx, "bigKey", body, 0).Result() assert.NotNil(err) }) @@ -340,3 +372,12 @@ func TestConfig(t *testing.T) { _, err = LoadConfig("go.mod") assert.NotNil(err) } + +func TestReadableSize(t *testing.T) { + assert := assert.New(t) + + assert.Equal(readableSize(50), "50B") + assert.Equal(readableSize(50*KB), "50.0KB") + assert.Equal(readableSize(50*MB), "50.0MB") + assert.Equal(readableSize(50*GB), "50.0GB") +} diff --git a/rotom.go b/rotom.go index 998c7ce..d336237 100644 --- a/rotom.go +++ b/rotom.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "io" "runtime" "runtime/debug" @@ -12,9 +13,10 @@ import ( ) const ( - READ_BUF_SIZE = 16 * KB - WRITE_BUF_SIZE = 4 * KB - MAX_READER_SIZE = 4 * KB + QUERY_BUF_SIZE = 8 * KB + WRITE_BUF_SIZE = 8 * KB + + MAX_QUERY_DATA_LEN = 128 * MB ) type ( @@ -90,7 +92,7 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) { client := &Client{ fd: cfd, replyWriter: NewWriter(WRITE_BUF_SIZE), - queryBuf: make([]byte, READ_BUF_SIZE), + queryBuf: make([]byte, QUERY_BUF_SIZE), argsBuf: make([]RESP, 8), } @@ -100,29 +102,36 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) { func ReadQueryFromClient(loop *AeLoop, fd int, extra interface{}) { client := extra.(*Client) + readSize := 0 - // grow query buffer - if len(client.queryBuf)-client.queryLen < MAX_READER_SIZE { - client.queryBuf = append(client.queryBuf, make([]byte, MAX_READER_SIZE)...) - } - +READ: n, err := Read(fd, client.queryBuf[client.queryLen:]) if err != nil { log.Error().Msgf("client %v read err: %v", fd, err) freeClient(client) return } - if n == MAX_READER_SIZE { - log.Error().Msgf("client %d read query too large, now free", fd) + readSize += n + client.queryLen += n + + if readSize == 0 { freeClient(client) return } - if n == 0 { + + if client.queryLen > MAX_QUERY_DATA_LEN { + log.Error().Msgf("client %d read query data too large, now free", fd) freeClient(client) return } - client.queryLen += n + // queryBuf need grow up + if client.queryLen == len(client.queryBuf) { + client.queryBuf = append(client.queryBuf, make([]byte, client.queryLen)...) + log.Warn().Msgf("client %d queryBuf grow up to size %s", fd, readableSize(len(client.queryBuf))) + goto READ + } + ProcessQueryBuf(client) } @@ -230,9 +239,9 @@ func SysMonitor(loop *AeLoop, id int, extra interface{}) { debug.ReadGCStats(&stat) log.Info(). - Uint64("gcsys", mem.GCSys). - Uint64("heapInuse", mem.HeapInuse). - Uint64("heapObjects", mem.HeapObjects). + Str("gcsys", readableSize(mem.GCSys)). + Str("heapInuse", readableSize(mem.HeapInuse)). + Str("heapObjects", fmt.Sprintf("%.1fk", float64(mem.HeapObjects)/1e3)). Int64("gc", stat.NumGC). Msgf("[SYS]") @@ -245,3 +254,15 @@ func SysMonitor(loop *AeLoop, id int, extra interface{}) { } server.outOfMemory = int(mem.HeapAlloc) > server.config.MaxMemory } + +func readableSize[T int | uint64](sz T) string { + switch { + case sz >= GB: + return fmt.Sprintf("%.1fGB", float64(sz)/float64(GB)) + case sz >= MB: + return fmt.Sprintf("%.1fMB", float64(sz)/float64(MB)) + case sz >= KB: + return fmt.Sprintf("%.1fKB", float64(sz)/float64(KB)) + } + return fmt.Sprintf("%dB", sz) +}