Skip to content

Commit

Permalink
fix: add release logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Mar 13, 2024
1 parent 8ba37e5 commit 47406c2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 23 deletions.
22 changes: 13 additions & 9 deletions pkg/remote/codec/default_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,27 +191,31 @@ func (c *defaultCodec) EncodeMetaAndPayload(ctx context.Context, message remote.
}

// encodeMetaAndPayloadWithCRC32C encodes payload and meta with crc32c checksum of the payload.
func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, message remote.Message, out remote.ByteBuffer, me remote.MetaEncoder) error {
var err error

func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, message remote.Message, out remote.ByteBuffer, me remote.MetaEncoder) (err error) {
var (
payloadOut = netpoll.NewWriterByteBuffer(netpoll2.NewLinkBuffer())
needRelease = true
)
defer func() {
if needRelease {
payloadOut.Release(err)
}
}()
// 1. encode payload and calculate crc32c checksum
payloadOut := netpoll.NewWriterByteBuffer(netpoll2.NewLinkBuffer())
if err = me.EncodePayload(ctx, message, payloadOut); err != nil {
return err
}
// get the payload from buffer
payload, payloadLen, err := payloadOut.(remote.NocopyRead).GetBytesNoCopy()
if err != nil {
// release if err
payloadOut.Release(err)
return err
}
crc32c := getCRC32C(payload)
strInfo := message.TransInfo().TransStrInfo()
if crc32c != "" && strInfo != nil {
strInfo[transmeta.HeaderCRC32C] = crc32c
}
// set payload length before encode TTHeader.
// set payload length before encode TTHeader
message.SetPayloadLen(payloadLen)

// 2. encode header and return totalLenField if needed
Expand All @@ -223,8 +227,9 @@ func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, messa
// 3. write payload to the buffer after TTHeader
if netpoll.IsNetpollByteBuffer(out) {
// append buffer only if the input buffer is a netpollByteBuffer
// release will be executed in AppendBuffer
// release will be executed in AppendBuffer, and thus set needRelease to false
err = out.AppendBuffer(payloadOut)
needRelease = false
} else {
// convert [][]byte to []byte
p := flatten2DSlice(payload, payloadLen)
Expand All @@ -233,7 +238,6 @@ func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, messa
} else {
_, err = out.WriteBinary(p)
}
payloadOut.Release(err)
}
return err
}
Expand Down
35 changes: 22 additions & 13 deletions pkg/remote/codec/default_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,11 @@ func TestDefaultSizedCodec_Encode_Decode(t *testing.T) {
}

func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
payloadLen := 1024 * 1024
// netpoll
crc32CodecTest(
t,
payloadLen,
func() remote.ByteBuffer {
return netpoll.NewReaderWriterByteBuffer(netpoll2.NewLinkBuffer())
},
Expand All @@ -243,6 +245,7 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
// non-netpoll, without writeDirect
crc32CodecTest(
t,
payloadLen,
func() remote.ByteBuffer {
return remote.NewWriterBuffer(0)
},
Expand All @@ -254,6 +257,7 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
// non-netpoll, with writeDirect
crc32CodecTest(
t,
payloadLen,
func() remote.ByteBuffer {
return NewMockNocopyWriter(netpoll.NewReaderWriterByteBuffer(netpoll2.NewLinkBuffer()))
},
Expand All @@ -263,14 +267,14 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
)
}

func crc32CodecTest(t *testing.T, outBufferBuilder func() remote.ByteBuffer, inBufferBuilder func([]byte) remote.ByteBuffer) {
func crc32CodecTest(t *testing.T, payloadLen int, outBufferBuilder func() remote.ByteBuffer, inBufferBuilder func([]byte) remote.ByteBuffer) {
remote.PutPayloadCode(serviceinfo.Thrift, mpc)

dc := NewDefaultCodecWithConfig(CodecConfig{CRC32Check: true})
ctx := context.Background()
intKVInfo := prepareIntKVInfo()
strKVInfo := prepareStrKVInfo()
sendMsg := initClientSendMsg(transport.TTHeaderFramed, 32*1024)
sendMsg := initClientSendMsg(transport.TTHeaderFramed, payloadLen)
sendMsg.TransInfo().PutTransIntInfo(intKVInfo)
sendMsg.TransInfo().PutTransStrInfo(strKVInfo)

Expand Down Expand Up @@ -356,7 +360,7 @@ func BenchmarkDefaultEncodeDecode(b *testing.B) {
codec := f()
sendMsg := initClientSendMsg(transport.TTHeader, msgLen)
// encode
out := remote.NewWriterBuffer(1024)
out := netpoll.NewWriterByteBuffer(netpoll2.NewLinkBuffer())
err := codec.Encode(ctx, sendMsg, out)
test.Assert(b, err == nil, err)

Expand Down Expand Up @@ -486,24 +490,29 @@ func TestCornerCase(t *testing.T) {
}

func TestFlatten2DSlice(t *testing.T) {
row, column := 10, 10
b2, expectedB1 := generateSlices(row, column)
length := row * column
actualB1 := flatten2DSlice(b2, length)
test.Assert(t, len(actualB1) == length)
for i := 0; i < length; i++ {
test.Assert(t, actualB1[i] == expectedB1[i])
}
}

func generateSlices(row, column int) ([][]byte, []byte) {
var (
b2 [][]byte
expectedB1 []byte
b2 [][]byte
b1 []byte
)
row, column := 10, 10
for i := 0; i < row; i++ {
var b []byte
for j := 0; j < column; j++ {
curr := rand.Int()
b = append(b, byte(curr))
expectedB1 = append(expectedB1, byte(curr))
b1 = append(b1, byte(curr))
}
b2 = append(b2, b)
}
length := row * column
actualB1 := flatten2DSlice(b2, length)
test.Assert(t, len(actualB1) == length)
for i := 0; i < length; i++ {
test.Assert(t, actualB1[i] == expectedB1[i])
}
return b2, b1
}
2 changes: 1 addition & 1 deletion pkg/remote/trans/netpoll/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (b *netpollByteBuffer) Bytes() (buf []byte, err error) {
return lb.Bytes(), nil
}

// GetBytes gets all written bytes and return 2d slice.
// GetBytesNoCopy gets all written bytes and return 2d slice.
// It uses GetBytes() of link buffer.
func (b *netpollByteBuffer) GetBytesNoCopy() ([][]byte, int, error) {
lb := b.writer.(*netpoll.LinkBuffer)
Expand Down
6 changes: 6 additions & 0 deletions pkg/remote/trans/netpoll/bytebuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,9 @@ func TestBytes(t *testing.T) {
test.Assert(t, actual[i] == b[i])
}
}

func TestRelease(t *testing.T) {
buf := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
buf.Release(nil)
buf.Release(nil)
}

0 comments on commit 47406c2

Please sign in to comment.