Skip to content

Commit

Permalink
fix: use netpoll buffer to encode payload to prevent length lost
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Feb 22, 2024
1 parent 1c063ba commit 8171ffa
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 18 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/cloudwego/kitex

go 1.13

replace github.com/cloudwego/netpoll => github.com/ppzqh/netpoll v0.0.0-20240222152609-9926c0656f3a

require (
github.com/apache/thrift v0.13.0
github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ github.com/cloudwego/kitex v0.4.4/go.mod h1:3FcH5h9Qw+dhRljSzuGSpWuThttA8DvK0BsL
github.com/cloudwego/kitex v0.6.1/go.mod h1:zI1GBrjT0qloTikcCfQTgxg3Ws+yQMyaChEEOcGNUvA=
github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJG0YhQkY=
github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
github.com/cloudwego/netpoll v0.2.4/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.3.1/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.4.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.5.1 h1:zDUF7xF0C97I10fGlQFJ4jg65khZZMUvSu/TWX44Ohc=
github.com/cloudwego/netpoll v0.5.1/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cloudwego/thriftgo v0.2.4/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.7/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
Expand Down Expand Up @@ -176,6 +171,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/ppzqh/netpoll v0.0.0-20240222152609-9926c0656f3a h1:GtXFa++1kgTs2i0abQcDdm4q70PFjxWghkj599xPeCE=
github.com/ppzqh/netpoll v0.0.0-20240222152609-9926c0656f3a/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
6 changes: 0 additions & 6 deletions pkg/remote/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ type NocopyWrite interface {
WriteDirect(buf []byte, remainCap int) error
}

// NocopyWrittenBytesGetter is used to get the written bytes from the buffer without copy.
type NocopyWrittenBytesGetter interface {
// BytesNocopy is used to get the bytes written with nocopy.
BytesNocopy() (buf []byte, err error)
}

// FrameWrite is to write header and data buffer separately to avoid memory copy
type FrameWrite interface {
// WriteHeader set header buffer without copy
Expand Down
11 changes: 5 additions & 6 deletions pkg/remote/codec/default_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import (
"sync"
"sync/atomic"

netpoll2 "github.com/cloudwego/netpoll"

"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/codec/perrors"
"github.com/cloudwego/kitex/pkg/remote/trans/netpoll"
"github.com/cloudwego/kitex/pkg/remote/transmeta"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -196,18 +199,14 @@ func (c *defaultCodec) encodeMetaAndPayloadWithCRC32C(ctx context.Context, messa
var err error

// 1. encode payload and calculate crc32c checksum
newPayloadOut := remote.NewWriterBuffer(PayloadBufferSize)
newPayloadOut := netpoll.NewWriterByteBuffer(netpoll2.NewLinkBuffer())

if err = me.EncodePayload(ctx, message, newPayloadOut); err != nil {
return err
}
// get the payload from buffer
var payload []byte
if nc, ok := newPayloadOut.(remote.NocopyWrittenBytesGetter); ok {
payload, err = nc.BytesNocopy()
} else {
payload, err = newPayloadOut.Bytes()
}
payload, err = newPayloadOut.Bytes()
newPayloadOut.Release(err)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion pkg/remote/trans/netpoll/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (b *netpollByteBuffer) AppendBuffer(buf remote.ByteBuffer) (err error) {

// Bytes are not supported in netpoll bytebuf.
func (b *netpollByteBuffer) Bytes() (buf []byte, err error) {
return nil, errors.New("method Bytes() not support in netpoll bytebuf")
lb := b.writer.(*netpoll.LinkBuffer)
return lb.WrittenBytes(), nil

Check failure on line 243 in pkg/remote/trans/netpoll/bytebuf.go

View workflow job for this annotation

GitHub Actions / unit-scenario-test

lb.WrittenBytes undefined (type *netpoll.LinkBuffer has no field or method WrittenBytes)
//return nil, errors.New("method Bytes() not support in netpoll bytebuf")
}

// Release will free the buffer already read.
Expand Down

0 comments on commit 8171ffa

Please sign in to comment.