Skip to content

Commit

Permalink
feat: add crc32c check for payload
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Feb 23, 2024
1 parent 7755c15 commit f063ab6
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudwego/fastpb v0.0.4
github.com/cloudwego/frugal v0.1.13
github.com/cloudwego/localsession v0.0.2
github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b
github.com/cloudwego/netpoll v0.5.2-0.20240223102227-0c594e3c8163
github.com/cloudwego/thriftgo v0.3.6
github.com/golang/mock v1.6.0
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiX
github.com/cloudwego/netpoll v0.2.4/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.3.1/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.4.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.5.1 h1:zDUF7xF0C97I10fGlQFJ4jg65khZZMUvSu/TWX44Ohc=
github.com/cloudwego/netpoll v0.5.1/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b h1:ZHtA1Q20H9WoLPfMHCSkMv8wUrN7YENJfQCVybErGy8=
github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.5.2-0.20240223102227-0c594e3c8163 h1:HGsD9cVt4x/tR3YfGLWSwYg9QGexudQdmIy/xnUaCJE=
github.com/cloudwego/netpoll v0.5.2-0.20240223102227-0c594e3c8163/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cloudwego/thriftgo v0.2.4/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.7/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
Expand Down
37 changes: 23 additions & 14 deletions pkg/remote/codec/default_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,27 +205,20 @@ func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, messa
return err
}
// get the payload from buffer
var payload []byte
payload, err = payloadOut.Bytes()
payload, payloadLen, err := payloadOut.(interface{ GetBytes() ([][]byte, int, error) }).GetBytes()
if err != nil {
// release if err
payloadOut.Release(err)
return err
}
outIsNetpollBuffer := netpoll.IsNetpollByteBuffer(out)
if !outIsNetpollBuffer {
// release payloadOut if the original out is not a netpoll buffer
// because it won't be used later.
payloadOut.Release(nil)
}

crc32c := getCRC32C(payload)
strInfo := message.TransInfo().TransStrInfo()
if crc32c != "" && strInfo != nil {
strInfo[transmeta.HeaderCRC32C] = crc32c
}
// set payload length before encode TTHeader.
message.SetPayloadLen(len(payload))
message.SetPayloadLen(payloadLen)

// 2. encode header and return totalLenField if needed
// In this case, set total length during TTHeader encode
Expand All @@ -234,15 +227,17 @@ func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, messa
}

// 3. write payload to the buffer after TTHeader
if outIsNetpollBuffer {
if netpoll.IsNetpollByteBuffer(out) {
// append buffer only if the input buffer is a netpollByteBuffer
// release will be executed in AppendBuffer
err = out.AppendBuffer(payloadOut)
} else {
// convert [][]byte to []byte
p := convert(payload, payloadLen)
if ncWriter, ok := out.(remote.NocopyWrite); ok {
err = ncWriter.WriteDirect(payload, 0)
err = ncWriter.WriteDirect(p, 0)
} else {
_, err = out.WriteBinary(payload)
_, err = out.WriteBinary(p)
}
}
return err
Expand Down Expand Up @@ -479,15 +474,29 @@ func checkPayloadSize(payloadLen, maxSize int) error {

// getCRC32C calculates the crc32c checksum of the input bytes.
// the checksum will be converted into big-endian format and encoded into hex string.
func getCRC32C(payload []byte) string {
func getCRC32C(payload [][]byte) string {
if crc32cTable == nil {
return ""
}
csb := make([]byte, Size32)
binary.BigEndian.PutUint32(csb, crc32.Checksum(payload, crc32cTable))
var checksum uint32
for i := 0; i < len(payload); i++ {
checksum = crc32.Update(checksum, crc32cTable, payload[i])
}
binary.BigEndian.PutUint32(csb, checksum)
return hex.EncodeToString(csb)
}

func convert(b2 [][]byte, length int) []byte {
b1 := make([]byte, length)
off := 0
for i := 0; i < len(b2); i++ {
copy(b1[off:off+len(b2[i])], b2[i])
off += len(b2[i])
}
return b1
}

// checkCRC32C validates the crc32c checksum in the header.
func checkCRC32C(message remote.Message, in remote.ByteBuffer) error {
strInfo := message.TransInfo().TransStrInfo()
Expand Down
50 changes: 48 additions & 2 deletions pkg/remote/codec/default_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
ctx := context.Background()
intKVInfo := prepareIntKVInfo()
strKVInfo := prepareStrKVInfo()
sendMsg := initClientSendMsg(transport.TTHeaderFramed, 3*1024)
sendMsg := initClientSendMsg(transport.TTHeaderFramed, 32*1024)
sendMsg.TransInfo().PutTransIntInfo(intKVInfo)
sendMsg.TransInfo().PutTransStrInfo(strKVInfo)

Expand All @@ -240,7 +240,7 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
err := dc.Encode(ctx, sendMsg, badOut)
test.Assert(t, err != nil)

// encode
// encode with netpollBytebuffer
npBuffer := netpoll.NewReaderWriterByteBuffer(netpoll2.NewLinkBuffer())
err = dc.Encode(ctx, sendMsg, npBuffer)
test.Assert(t, err == nil, err)
Expand All @@ -249,7 +249,53 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
recvMsg := initServerRecvMsg()
buf, err := npBuffer.Bytes()
test.Assert(t, err == nil, err)
in := remote.NewReaderBuffer(buf)
err = dc.Decode(ctx, recvMsg, in)
test.Assert(t, err == nil, err)
intKVInfoRecv := recvMsg.TransInfo().TransIntInfo()
strKVInfoRecv := recvMsg.TransInfo().TransStrInfo()
test.DeepEqual(t, intKVInfoRecv, intKVInfo)
test.DeepEqual(t, strKVInfoRecv, strKVInfo)
test.Assert(t, sendMsg.RPCInfo().Invocation().SeqID() == recvMsg.RPCInfo().Invocation().SeqID())

// decode, crc32c check failed
test.Assert(t, err == nil, err)
bufLen := len(buf)
modifiedBuf := make([]byte, bufLen)
copy(modifiedBuf, buf)
for i := bufLen - 1; i > bufLen-10; i-- {
modifiedBuf[i] = 123
}
in = remote.NewReaderBuffer(modifiedBuf)
err = dc.Decode(ctx, recvMsg, in)
test.Assert(t, err != nil, err)
}

func TestDefaultCodecWithCRC32EncodeDecodeWithNonNetpollBuffer(t *testing.T) {
remote.PutPayloadCode(serviceinfo.Thrift, mpc)

dc := NewDefaultCodecWithConfig(CodecConfig{CRC32Check: true})
ctx := context.Background()
intKVInfo := prepareIntKVInfo()
strKVInfo := prepareStrKVInfo()
sendMsg := initClientSendMsg(transport.TTHeaderFramed, 32*1024)
sendMsg.TransInfo().PutTransIntInfo(intKVInfo)
sendMsg.TransInfo().PutTransStrInfo(strKVInfo)

// test encode err
badOut := remote.NewReaderBuffer(nil)
err := dc.Encode(ctx, sendMsg, badOut)
test.Assert(t, err != nil)

// encode with defaultByteBuffer
byteBuffer := remote.NewWriterBuffer(0)
err = dc.Encode(ctx, sendMsg, byteBuffer)
test.Assert(t, err == nil, err)

// decode, succeed
recvMsg := initServerRecvMsg()
buf, err := byteBuffer.Bytes()
test.Assert(t, err == nil, err)
in := remote.NewReaderBuffer(buf)
err = dc.Decode(ctx, recvMsg, in)
test.Assert(t, err == nil, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/remote/trans/netpoll/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ func (b *netpollByteBuffer) Bytes() (buf []byte, err error) {
return lb.Bytes(), nil
}

func (b *netpollByteBuffer) GetBytes() ([][]byte, int, error) {
lb := b.writer.(*netpoll.LinkBuffer)
if err := lb.Flush(); err != nil {
return nil, 0, err
}
return lb.GetBytes(nil), lb.Len(), nil
}

// Release will free the buffer already read.
// After release, buffer read by Next/Skip/Peek is invalid.
func (b *netpollByteBuffer) Release(e error) (err error) {
Expand Down

0 comments on commit f063ab6

Please sign in to comment.