From 41ceca05c1ca09a63dd70a3bdf8d6e538a10dcde Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Thu, 1 Sep 2022 16:51:06 -0700 Subject: [PATCH] [konnectivity-client] Send best-effort DIAL_CLS on dial timeout --- konnectivity-client/pkg/client/client.go | 17 +++++++++++++++++ konnectivity-client/pkg/client/client_test.go | 11 +++++++++++ 2 files changed, 28 insertions(+) diff --git a/konnectivity-client/pkg/client/client.go b/konnectivity-client/pkg/client/client.go index da3c439bb..165a1ba95 100644 --- a/konnectivity-client/pkg/client/client.go +++ b/konnectivity-client/pkg/client/client.go @@ -327,9 +327,11 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s t.connsLock.Unlock() case <-time.After(30 * time.Second): klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random) + go t.closeDial(random) return nil, &dialFailure{"dial timeout, backstop", DialFailureTimeout} case <-requestCtx.Done(): klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random) + go t.closeDial(random) return nil, &dialFailure{"dial timeout, context", DialFailureContext} } @@ -340,6 +342,21 @@ func (t *grpcTunnel) Done() <-chan struct{} { return t.done } +// Send a best-effort DIAL_CLS request for the given dial ID. +func (t *grpcTunnel) closeDial(dialID int64) { + req := &client.Packet{ + Type: client.PacketType_DIAL_CLS, + Payload: &client.Packet_CloseDial{ + CloseDial: &client.CloseDial{ + Random: dialID, + }, + }, + } + if err := t.stream.Send(req); err != nil { + klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID) + } +} + func GetDialFailureReason(err error) (isDialFailure bool, reason DialFailureReason) { var df *dialFailure if errors.As(err, &df) { diff --git a/konnectivity-client/pkg/client/client_test.go b/konnectivity-client/pkg/client/client_test.go index 4fd907c2f..8ac41b466 100644 --- a/konnectivity-client/pkg/client/client_test.go +++ b/konnectivity-client/pkg/client/client_test.go @@ -310,6 +310,11 @@ func TestDial_RequestContextCancelled(t *testing.T) { reqCancel() return nil // don't respond } + closeCh := make(chan struct{}) + ts.handlers[client.PacketType_DIAL_CLS] = func(*client.Packet) *client.Packet { + close(closeCh) + return nil // don't respond + } defer ps.Close() defer s.Close() @@ -332,6 +337,12 @@ func TestDial_RequestContextCancelled(t *testing.T) { } ts.assertPacketType(0, client.PacketType_DIAL_REQ) + select { + case <-closeCh: + ts.assertPacketType(1, client.PacketType_DIAL_CLS) + case <-time.After(30 * time.Second): + t.Fatal("Timed out waiting for DIAL_CLS packet") + } } func TestDial_BackendError(t *testing.T) {