Skip to content

Commit

Permalink
fix(gRPC): pass error when client transport is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Aug 27, 2024
1 parent 237fae0 commit a612745
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 60 deletions.
54 changes: 28 additions & 26 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,14 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
// Send connection preface to server.
n, err := t.conn.Write(ClientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != ClientPrefaceLen {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, ClientPrefaceLen)
err = connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, ClientPrefaceLen)
t.Close(err)
return nil, err
}

ss := []http2.Setting{
Expand All @@ -237,15 +239,17 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
}
err = t.framer.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}

// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}

Expand Down Expand Up @@ -594,7 +598,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) error {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
Expand All @@ -617,12 +621,13 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
cErr := t.conn.Close()

// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
}
return err
return cErr
}

// GracefulClose sets the state to draining, which prevents new streams from
Expand All @@ -641,7 +646,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down Expand Up @@ -886,7 +891,7 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
Expand All @@ -904,7 +909,7 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
Expand Down Expand Up @@ -936,7 +941,7 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}

Expand Down Expand Up @@ -1031,10 +1036,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.ReadFrame()
if err != nil {
// TODO(emma): comment this log temporarily, because when use short connection, 'resource temporarily unavailable' error will happen
// if the log need to be output, connection info should be appended
// klog.Errorf("KITEX: grpc readFrame failed, error=%s", err.Error())
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
Expand All @@ -1043,7 +1046,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*grpcframe.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "first frame received is not a setting frame")
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.handleSettings(sf, true)
Expand Down Expand Up @@ -1076,10 +1080,8 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
// TODO(emma): comment this log temporarily, because when use short connection, 'resource temporarily unavailable' error will happen
// if the log need to be output, connection info should be appended
// klog.Errorf("KITEX: grpc readFrame failed, error=%s", err.Error())
t.Close()
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
t.Close(err)
return
}
}
Expand Down Expand Up @@ -1137,7 +1139,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()
Expand Down
28 changes: 14 additions & 14 deletions pkg/remote/trans/nphttp2/grpc/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestMaxConnectionIdle(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -85,7 +85,7 @@ func TestMaxConnectionIdleBusyClient(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -162,7 +162,7 @@ func TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -226,7 +226,7 @@ func TestKeepaliveServerWithResponsiveClient(t *testing.T) {
},
})
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -258,7 +258,7 @@ func TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
if client == nil {
t.Fatalf("setUpWithNoPingServer failed, return nil client")
}
defer client.Close()
defer client.Close(selfCloseErr)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
if client == nil {
t.Fatalf("setUpWithNoPingServer failed, return nil client")
}
defer client.Close()
defer client.Close(selfCloseErr)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
if client == nil {
t.Fatalf("setUpWithNoPingServer failed, return nil client")
}
defer client.Close()
defer client.Close(selfCloseErr)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
},
})
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -400,7 +400,7 @@ func TestKeepaliveClientFrequency(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -444,7 +444,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -487,7 +487,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -536,7 +536,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -569,7 +569,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down Expand Up @@ -608,7 +608,7 @@ func TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T) {
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(selfCloseErr)
server.stop()
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/nphttp2/grpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
Close(err error) error

// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
Expand Down
Loading

0 comments on commit a612745

Please sign in to comment.