Skip to content

Commit

Permalink
[konnectivity-client] Send best-effort DIAL_CLS on dial timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
tallclair committed Sep 7, 2022
1 parent c5ec7c9 commit 41ceca0
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
17 changes: 17 additions & 0 deletions konnectivity-client/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,11 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s
t.connsLock.Unlock()
case <-time.After(30 * time.Second):
klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random)
go t.closeDial(random)
return nil, &dialFailure{"dial timeout, backstop", DialFailureTimeout}
case <-requestCtx.Done():
klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random)
go t.closeDial(random)
return nil, &dialFailure{"dial timeout, context", DialFailureContext}
}

Expand All @@ -340,6 +342,21 @@ func (t *grpcTunnel) Done() <-chan struct{} {
return t.done
}

// Send a best-effort DIAL_CLS request for the given dial ID.
func (t *grpcTunnel) closeDial(dialID int64) {
req := &client.Packet{
Type: client.PacketType_DIAL_CLS,
Payload: &client.Packet_CloseDial{
CloseDial: &client.CloseDial{
Random: dialID,
},
},
}
if err := t.stream.Send(req); err != nil {
klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID)
}
}

func GetDialFailureReason(err error) (isDialFailure bool, reason DialFailureReason) {
var df *dialFailure
if errors.As(err, &df) {
Expand Down
11 changes: 11 additions & 0 deletions konnectivity-client/pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ func TestDial_RequestContextCancelled(t *testing.T) {
reqCancel()
return nil // don't respond
}
closeCh := make(chan struct{})
ts.handlers[client.PacketType_DIAL_CLS] = func(*client.Packet) *client.Packet {
close(closeCh)
return nil // don't respond
}

defer ps.Close()
defer s.Close()
Expand All @@ -332,6 +337,12 @@ func TestDial_RequestContextCancelled(t *testing.T) {
}

ts.assertPacketType(0, client.PacketType_DIAL_REQ)
select {
case <-closeCh:
ts.assertPacketType(1, client.PacketType_DIAL_CLS)
case <-time.After(30 * time.Second):
t.Fatal("Timed out waiting for DIAL_CLS packet")
}
}

func TestDial_BackendError(t *testing.T) {
Expand Down

0 comments on commit 41ceca0

Please sign in to comment.