Skip to content

Commit

Permalink
perf: linear allocator for fast codec ReadString/ReadBinary (cloudweg…
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored Apr 18, 2024
1 parent d0c247b commit bda9a0d
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 7 deletions.
104 changes: 104 additions & 0 deletions pkg/mem/span.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mem

import (
"math/bits"
"sync/atomic"
)

const (
spanCacheSize = 10
minSpanObject = 128 // 128 B
maxSpanObject = (minSpanObject << spanCacheSize) - 1 // 128 KB
minSpanClass = 8 // = spanClass(minSpanObject)
)

type spanCache struct {
spans [spanCacheSize]*span
}

func NewSpanCache(spanSize int) *spanCache {
c := new(spanCache)
for i := 0; i < len(c.spans); i++ {
c.spans[i] = NewSpan(spanSize)
}
return c
}

func (c *spanCache) Make(n int) []byte {
sclass := spanClass(n) - minSpanClass
if sclass < 0 || sclass >= len(c.spans) {
return make([]byte, n)
}
return c.spans[sclass].Make(n)
}

func (c *spanCache) Copy(buf []byte) (p []byte) {
p = c.Make(len(buf))
copy(p, buf)
return p
}

func NewSpan(size int) *span {
sp := new(span)
sp.size = uint32(size)
sp.buffer = make([]byte, 0, size)
return sp
}

type span struct {
lock uint32
read uint32 // read index of buffer
size uint32 // size of buffer
buffer []byte
}

func (b *span) Make(_n int) []byte {
n := uint32(_n)
if n >= b.size || !atomic.CompareAndSwapUint32(&b.lock, 0, 1) {
// fallback path: make a new byte slice if current goroutine cannot get the lock or n is out of size
return make([]byte, n)
}
START:
b.read += n
// fast path
if b.read <= b.size {
buf := b.buffer[b.read-n : b.read]
atomic.StoreUint32(&b.lock, 0)
return buf
}
// slow path: create a new buffer
b.buffer = make([]byte, b.size)
b.read = 0
goto START
}

func (b *span) Copy(buf []byte) (p []byte) {
p = b.Make(len(buf))
copy(p, buf)
return p
}

// spanClass calc the minimum number of bits required to represent x
// [2^sclass,2^(sclass+1)) bytes in a same span class
func spanClass(size int) int {
if size == 0 {
return 0
}
return bits.Len(uint(size))
}
163 changes: 163 additions & 0 deletions pkg/mem/span_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mem

import (
"fmt"
"sync"
"testing"

"github.com/cloudwego/kitex/internal/test"
)

func TestSpanClass(t *testing.T) {
test.DeepEqual(t, spanClass(minSpanObject), minSpanClass)

test.DeepEqual(t, spanClass(1), 1)

test.DeepEqual(t, spanClass(2), 2)
test.DeepEqual(t, spanClass(3), 2)

test.DeepEqual(t, spanClass(4), 3)
test.DeepEqual(t, spanClass(5), 3)
test.DeepEqual(t, spanClass(6), 3)
test.DeepEqual(t, spanClass(7), 3)

test.DeepEqual(t, spanClass(8), 4)
test.DeepEqual(t, spanClass(9), 4)

test.DeepEqual(t, spanClass(32), 6)
test.DeepEqual(t, spanClass(33), 6)
test.DeepEqual(t, spanClass(63), 6)

test.DeepEqual(t, spanClass(64), 7)
test.DeepEqual(t, spanClass(65), 7)
test.DeepEqual(t, spanClass(127), 7)
test.DeepEqual(t, spanClass(128), 8)
test.DeepEqual(t, spanClass(129), 8)
}

func TestSpan(t *testing.T) {
bc := NewSpan(1024 * 32)

var wg sync.WaitGroup
for c := 0; c < 12; c++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1024; i++ {
buf := []byte("123")
test.DeepEqual(t, bc.Copy(buf), buf)

buf = make([]byte, 32)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 63)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 64)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 32*1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 64*1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 128*1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
}
}()
}
wg.Wait()
}

func TestSpanCache(t *testing.T) {
bc := NewSpanCache(1024 * 32)

var wg sync.WaitGroup
for c := 0; c < 12; c++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1024; i++ {
buf := []byte("123")
test.DeepEqual(t, bc.Copy(buf), buf)

buf = make([]byte, 32)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 63)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 64)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 32*1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 64*1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
buf = make([]byte, 128*1024)
test.DeepEqual(t, len(bc.Copy(buf)), len(buf))
}
}()
}
wg.Wait()
}

var benchStringSizes = []int{
15, 31, 127, // not cached
128, 256, 1024 - 1, 4*1024 - 1,
32*1024 - 1, 128*1024 - 1,
512*1024 - 1, // not cached
}

func BenchmarkSpanCacheCopy(b *testing.B) {
for _, sz := range benchStringSizes {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
from := make([]byte, sz)
sc := NewSpanCache(maxSpanObject * 8)
b.RunParallel(func(pb *testing.PB) {
b.ReportAllocs()
b.ResetTimer()
var buffer []byte
for pb.Next() {
buffer = sc.Copy(from)
buffer[0] = 'a'
buffer[sz-1] = 'z'
}
_ = buffer
})
})
}
}

func BenchmarkMakeAndCopy(b *testing.B) {
for _, sz := range benchStringSizes {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
from := make([]byte, sz)
b.RunParallel(func(pb *testing.PB) {
b.ReportAllocs()
b.ResetTimer()
var buffer []byte
for pb.Next() {
buffer = make([]byte, sz)
copy(buffer, from)
buffer[0] = 'a'
buffer[sz-1] = 'z'
}
_ = buffer
})
})
}
}
17 changes: 10 additions & 7 deletions pkg/protocol/bthrift/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ import (

"github.com/apache/thrift/lib/go/thrift"

"github.com/cloudwego/kitex/pkg/mem"
"github.com/cloudwego/kitex/pkg/remote/codec/perrors"
"github.com/cloudwego/kitex/pkg/utils"
)

// Binary protocol for bthrift.
var Binary binaryProtocol

var _ BTProtocol = binaryProtocol{}
var (
// Binary protocol for bthrift.
Binary binaryProtocol
_ BTProtocol = binaryProtocol{}
spanCache = mem.NewSpanCache(1024 * 1024) // 1MB
)

type binaryProtocol struct{}

Expand Down Expand Up @@ -458,7 +461,8 @@ func (binaryProtocol) ReadString(buf []byte) (value string, length int, err erro
if size < 0 || int(size) > len(buf) {
return value, length, perrors.NewProtocolErrorWithType(thrift.INVALID_DATA, "[ReadString] the string size greater than buf length")
}
value = string(buf[length : length+int(size)])
data := spanCache.Copy(buf[length : length+int(size)])
value = utils.SliceByteToString(data)
length += int(size)
return
}
Expand All @@ -473,8 +477,7 @@ func (binaryProtocol) ReadBinary(buf []byte) (value []byte, length int, err erro
if size < 0 || int(size) > len(buf) {
return value, length, perrors.NewProtocolErrorWithType(thrift.INVALID_DATA, "[ReadBinary] the binary size greater than buf length")
}
value = make([]byte, size)
copy(value, buf[length:length+int(size)])
value = spanCache.Copy(buf[length : length+int(size)])
length += int(size)
return
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/protocol/bthrift/binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package bthrift

import (
"encoding/binary"
"fmt"
"testing"

Expand Down Expand Up @@ -534,3 +535,28 @@ func TestSkip(t *testing.T) {
_, valSet, _, _ := Binary.ReadSetBegin(buf[offset:])
test.Assert(t, valSet == 11)
}

func BenchmarkBinaryProtocolReadString(b *testing.B) {
stringSizes := []int{64, 128, 1024, 4096, 10240, 102400}
strings := 32
for _, sz := range stringSizes {
b.Run(fmt.Sprintf("string_size=%d", sz), func(b *testing.B) {
stringBuffer := make([]byte, sz*strings+4*strings)
start := 0
for i := 0; i < strings; i++ {
binary.BigEndian.PutUint32(stringBuffer[start:start+4], uint32(sz))
start += sz + 4
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < strings; i++ {
_, l, err := Binary.ReadString(stringBuffer)
if l-4 != sz || err != nil {
b.Fatal("read string failed", l, err)
}
}
}
})
}
}

0 comments on commit bda9a0d

Please sign in to comment.