From 5c7a71208065101a59f3a8eb43b8abcc95c24361 Mon Sep 17 00:00:00 2001 From: xgzlucario <912156837@qq.com> Date: Sun, 4 Aug 2024 15:29:59 +0800 Subject: [PATCH] perf: RESP & aeloop performance optimization --- README.md | 2 +- README_CN.md | 2 +- ae.go | 5 ++++- command.go | 28 +++++++++----------------- resp.go | 57 +++++++++++++++++++++++----------------------------- resp_test.go | 44 ++++++++++++++++++++-------------------- 6 files changed, 63 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index 2d76816..efc5014 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ English | [中文](README_CN.md) ## Introduction -This is rotom, a tiny Redis Server written in Go. It replicates the core event loop mechanism AeLoop in Redis based on I/O multiplexing. +This is rotom, a high performance, low latency tiny Redis Server written in Go. It replicates the core event loop mechanism AeLoop in Redis based on I/O multiplexing. ## Features diff --git a/README_CN.md b/README_CN.md index d56cc4c..83202d7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -6,7 +6,7 @@ ## 介绍 -这里是 rotom,一个使用 Go 编写的 tiny Redis Server。基于 IO 多路复用还原了 Redis 中的 AeLoop 核心事件循环机制。 +这里是 rotom,一个使用 Go 编写的高性能,低延迟的 tiny Redis Server。基于 IO 多路复用还原了 Redis 中的 AeLoop 核心事件循环机制。 ## 特性 diff --git a/ae.go b/ae.go index 414b132..8c16f88 100644 --- a/ae.go +++ b/ae.go @@ -37,6 +37,8 @@ type AeLoop struct { fileEventFd int timeEventNextId int stop bool + + _fevents []*AeFileEvent // fes cache } func (loop *AeLoop) AddRead(fd int, proc FileProc, extra interface{}) { @@ -139,6 +141,7 @@ func AeLoopCreate() (*AeLoop, error) { fileEventFd: epollFd, timeEventNextId: 1, stop: false, + _fevents: make([]*AeFileEvent, 128), // pre alloc }, nil } @@ -171,7 +174,7 @@ retry: } // collect file events - fes = make([]*AeFileEvent, 0, n) + fes = loop._fevents[:0] for _, ev := range events[:n] { if ev.Events&unix.EPOLLIN != 0 { fe := loop.FileEvents[int(ev.Fd)] diff --git a/command.go b/command.go index 331e053..b684878 100644 --- a/command.go +++ b/command.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "fmt" "strconv" "strings" @@ -13,7 +12,7 @@ import ( ) var ( - WITH_SCORES = []byte("WITHSCORES") + WITH_SCORES = "WITHSCORES" ) type Command struct { @@ -65,10 +64,6 @@ func equalFold(a, b string) bool { return len(a) == len(b) && strings.EqualFold(a, b) } -func equalFoldBytes(a, b []byte) bool { - return len(a) == len(b) && bytes.EqualFold(a, b) -} - func lookupCommand(name string) (*Command, error) { for _, c := range cmdTable { if equalFold(name, c.name) { @@ -98,11 +93,11 @@ func setCommand(writer *RESPWriter, args []RESP) { } func incrCommand(writer *RESPWriter, args []RESP) { - key := args[0].ToString() + key := args[0].ToStringUnsafe() object, ttl := db.dict.Get(key) if ttl == dict.KEY_NOT_EXIST { - db.dict.Set(key, 1) + db.dict.Set(strings.Clone(key), 1) writer.WriteInteger(1) return } @@ -114,6 +109,7 @@ func incrCommand(writer *RESPWriter, args []RESP) { writer.WriteInteger(num) case dict.TypeString: + // conv to integer bytes := object.Data().([]byte) num, err := RESP(bytes).ToInt() if err != nil { @@ -345,14 +341,12 @@ func saddCommand(writer *RESPWriter, args []RESP) { } func sremCommand(writer *RESPWriter, args []RESP) { - key := args[0].ToString() - + key := args[0].ToStringUnsafe() set, err := fetchSet(key) if err != nil { writer.WriteError(err) return } - var count int for _, arg := range args[1:] { if set.Remove(arg.ToStringUnsafe()) { @@ -363,17 +357,15 @@ func sremCommand(writer *RESPWriter, args []RESP) { } func spopCommand(writer *RESPWriter, args []RESP) { - key := args[0].ToString() - + key := args[0].ToStringUnsafe() set, err := fetchSet(key) if err != nil { writer.WriteError(err) return } - - item, ok := set.Pop() + member, ok := set.Pop() if ok { - writer.WriteBulkString(item) + writer.WriteBulkString(member) } else { writer.WriteNull() } @@ -461,7 +453,7 @@ func zrangeCommand(writer *RESPWriter, args []RESP) { } start = min(start, stop) - withScores := len(args) == 4 && equalFoldBytes(args[3], WITH_SCORES) + withScores := len(args) == 4 && equalFold(args[3].ToStringUnsafe(), WITH_SCORES) if withScores { writer.WriteArrayHead((stop - start) * 2) zset.Range(start, stop, func(key string, score float64) { @@ -478,7 +470,7 @@ func zrangeCommand(writer *RESPWriter, args []RESP) { } func zpopminCommand(writer *RESPWriter, args []RESP) { - key := args[0].ToString() + key := args[0].ToStringUnsafe() count := 1 var err error if len(args) > 1 { diff --git a/resp.go b/resp.go index b9ca1f9..c18e25a 100644 --- a/resp.go +++ b/resp.go @@ -1,8 +1,8 @@ package main import ( + "bytes" "io" - "slices" "strconv" "unsafe" ) @@ -28,20 +28,23 @@ func NewReader(input []byte) *RESPReader { return &RESPReader{b: input} } -// cutByCRLF splits the buffer by the first occurrence of CRLF. -func cutByCRLF(buf []byte) (before, after []byte, found bool) { - n := len(buf) - if n <= 2 { - return - } - for i, b := range buf[:n-1] { +// parseInt parse first integer from buf. +// input "3\r\nHELLO" -> (3, "HELLO", nil). +func parseInt(buf []byte) (n int, after []byte, err error) { + for i, b := range buf { + if b >= '0' && b <= '9' { + n = n*10 + int(b-'0') + continue + } if b == '\r' { - if buf[i+1] == '\n' { - return buf[:i], buf[i+2:], true + if len(buf) > i+1 && buf[i+1] == '\n' { + return n, buf[i+2:], nil } + break } + return 0, nil, errParseInteger } - return + return 0, nil, errCRLFNotFound } // ReadNextCommand reads the next RESP command from the RESPReader. @@ -55,45 +58,37 @@ func (r *RESPReader) ReadNextCommand(argsBuf []RESP) (args []RESP, err error) { switch r.b[0] { case ARRAY: // command_bulk format - before, after, ok := cutByCRLF(r.b[1:]) - if !ok { - return nil, errCRLFNotFound - } - count, err := strconv.Atoi(b2s(before)) + num, after, err := parseInt(r.b[1:]) if err != nil { return nil, err } r.b = after // read bulk strings for range - for i := 0; i < count; i++ { + for i := 0; i < num; i++ { if len(r.b) == 0 || r.b[0] != BULK { return nil, errInvalidArguments } - // read CRLF - before, after, ok := cutByCRLF(r.b[1:]) - if !ok { - return nil, errCRLFNotFound - } - count, err := strconv.Atoi(b2s(before)) + num, after, err := parseInt(r.b[1:]) if err != nil { return nil, err } - r.b = after // bound check - if count < 0 || count+2 > len(r.b) { + if num < 0 || num+2 > len(after) { return nil, errInvalidArguments } - args = append(args, r.b[:count]) - r.b = r.b[count+2:] + args = append(args, after[:num]) + + // skip CRLF + r.b = after[num+2:] } default: // command_inline format - before, after, ok := cutByCRLF(r.b) + before, after, ok := bytes.Cut(r.b, CRLF) if !ok { return nil, errInvalidArguments } @@ -179,8 +174,6 @@ func (r RESP) ToInt() (int, error) { return strconv.Atoi(b2s(r)) } func (r RESP) ToFloat() (float64, error) { return strconv.ParseFloat(b2s(r), 64) } -func (r RESP) Clone() []byte { return slices.Clone(r) } +func (r RESP) Clone() []byte { return bytes.Clone(r) } -func b2s(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} +func b2s(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } diff --git a/resp_test.go b/resp_test.go index 9a4a64a..7673c2b 100644 --- a/resp_test.go +++ b/resp_test.go @@ -61,26 +61,26 @@ func TestReader(t *testing.T) { } }) - t.Run("cutByCRLF", func(t *testing.T) { - before, after, ok := cutByCRLF([]byte("123\r\n456")) - assert.Equal(string(before), "123") - assert.Equal(string(after), "456") - assert.True(ok) - - before, after, ok = cutByCRLF([]byte("1234\r\n5678")) - assert.Equal(string(before), "1234") - assert.Equal(string(after), "5678") - assert.True(ok) - - // error cases - _, _, ok = cutByCRLF([]byte("A")) - assert.False(ok) - - _, _, ok = cutByCRLF([]byte("ABC")) - assert.False(ok) - - _, _, ok = cutByCRLF([]byte("1234\r")) - assert.False(ok) + t.Run("parseInt", func(t *testing.T) { + n, after, err := parseInt([]byte("3\r\nHELLO")) + assert.Equal(n, 3) + assert.Equal(after, []byte("HELLO")) + assert.Nil(err) + + n, after, err = parseInt([]byte("003\r\nHELLO")) + assert.Equal(n, 3) + assert.Equal(after, []byte("HELLO")) + assert.Nil(err) + + // errors + _, _, err = parseInt([]byte("ABC\r\nHELLO")) + assert.ErrorIs(err, errParseInteger) + + _, _, err = parseInt([]byte("1234567\r")) + assert.ErrorIs(err, errCRLFNotFound) + + _, _, err = parseInt([]byte("1234567")) + assert.ErrorIs(err, errCRLFNotFound) }) t.Run("command-bulk", func(t *testing.T) { @@ -93,11 +93,11 @@ func TestReader(t *testing.T) { // error args, err = NewReader([]byte("*A\r\n$3\r\nGET\r\n$3\r\nfoo\r\n")).ReadNextCommand(nil) assert.Equal(len(args), 0) - assert.NotNil(err) + assert.ErrorIs(err, errParseInteger) args, err = NewReader([]byte("*3\r\n$A\r\nGET\r\n$3\r\nfoo\r\n")).ReadNextCommand(nil) assert.Equal(len(args), 0) - assert.NotNil(err) + assert.ErrorIs(err, errParseInteger) args, err = NewReader([]byte("*3\r\n+PING")).ReadNextCommand(nil) assert.Equal(len(args), 0)