Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple konn-client cleanups #343

Merged
merged 3 commits into from
Sep 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 40 additions & 39 deletions konnectivity-client/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,25 +224,25 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
// In either scenario, we should return here and close the tunnel as it is no longer needed.
klog.V(1).InfoS("DialResp not recognized; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
} else {
result := dialResult{connid: resp.ConnectID}
if resp.Error != "" {
result.err = &dialFailure{resp.Error, DialFailureEndpoint}
}
select {
// try to send to the result channel
case pendingDial.resultCh <- result:
// unblock if the cancel channel is closed
case <-pendingDial.cancelCh:
// Note: this condition can only be hit by a race condition where the
// DialContext() returns early (timeout) after the pendingDial is already
// fetched here, but before the result is sent.
klog.V(1).InfoS("Pending dial has been cancelled; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
case <-tunnelCtx.Done():
klog.V(1).InfoS("Tunnel has been closed; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a line break after this line


result := dialResult{connid: resp.ConnectID}
if resp.Error != "" {
result.err = &dialFailure{resp.Error, DialFailureEndpoint}
}
select {
// try to send to the result channel
case pendingDial.resultCh <- result:
// unblock if the cancel channel is closed
case <-pendingDial.cancelCh:
// Note: this condition can only be hit by a race condition where the
// DialContext() returns early (timeout) after the pendingDial is already
// fetched here, but before the result is sent.
klog.V(1).InfoS("Pending dial has been cancelled; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
case <-tunnelCtx.Done():
klog.V(1).InfoS("Tunnel has been closed; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
}

if resp.Error != "" {
Expand Down Expand Up @@ -281,33 +281,34 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
// TODO: flow control
conn, ok := t.conns.get(resp.ConnectID)

if ok {
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
select {
case conn.readCh <- resp.Data:
timer.Stop()
case <-timer.C:
klog.ErrorS(fmt.Errorf("timeout"), "readTimeout has been reached, the grpc connection to the proxy server will be closed", "connectionID", conn.connID, "readTimeoutSeconds", t.readTimeoutSeconds)
return
case <-tunnelCtx.Done():
klog.V(1).InfoS("Tunnel has been closed, the grpc connection to the proxy server will be closed", "connectionID", conn.connID)
}
} else {
klog.V(1).InfoS("connection not recognized", "connectionID", resp.ConnectID)
if !ok {
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
continue
}
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
select {
case conn.readCh <- resp.Data:
timer.Stop()
case <-timer.C:
klog.ErrorS(fmt.Errorf("timeout"), "readTimeout has been reached, the grpc connection to the proxy server will be closed", "connectionID", conn.connID, "readTimeoutSeconds", t.readTimeoutSeconds)
return
case <-tunnelCtx.Done():
klog.V(1).InfoS("Tunnel has been closed, the grpc connection to the proxy server will be closed", "connectionID", conn.connID)
}

case client.PacketType_CLOSE_RSP:
resp := pkt.GetCloseResponse()
conn, ok := t.conns.get(resp.ConnectID)

if ok {
close(conn.readCh)
conn.closeCh <- resp.Error
close(conn.closeCh)
t.conns.remove(resp.ConnectID)
return
if !ok {
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
continue
}
klog.V(1).InfoS("connection not recognized", "connectionID", resp.ConnectID)
close(conn.readCh)
conn.closeCh <- resp.Error
close(conn.closeCh)
t.conns.remove(resp.ConnectID)
return
}
}
}
Expand Down