Skip to content

Commit

Permalink
Merge pull request #392 from cheftako/goroutineLabel
Browse files Browse the repository at this point in the history
Added goroutine labels to goroutine calls.
  • Loading branch information
k8s-ci-robot authored Sep 7, 2022
2 parents 85e1558 + 559bde9 commit aedf2cf
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 104 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ proto/agent/agent.pb.go: proto/agent/agent.proto
## --------------------------------------

easy-rsa.tar.gz:
curl -L -o easy-rsa.tar.gz --connect-timeout 20 --retry 6 --retry-delay 2 https://github.com/OpenVPN/easy-rsa/archive/refs/tags/v3.0.8.tar.gz
curl -L -o easy-rsa.tar.gz --connect-timeout 20 --retry 6 --retry-delay 2 https://storage.googleapis.com/kubernetes-release/easy-rsa/easy-rsa.tar.gz

easy-rsa: easy-rsa.tar.gz
tar xvf easy-rsa.tar.gz
mv easy-rsa-3.0.8 easy-rsa
mv easy-rsa-master easy-rsa

cfssl:
@if ! command -v cfssl &> /dev/null; then \
Expand Down
42 changes: 28 additions & 14 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package app

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/http/pprof"
"runtime"
runpprof "runtime/pprof"
"strconv"
"time"

Expand Down Expand Up @@ -103,17 +105,23 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions) error {
ReadHeaderTimeout: ReadHeaderTimeout,
}

go func() {
err := healthServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "health server could not listen")
}
klog.V(0).Infoln("Health server stopped listening")
}()
labels := runpprof.Labels(
"core", "healthListener",
"port", strconv.Itoa(o.HealthServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(healthServer) })

return nil
}

func (a *Agent) serveHealth(healthServer *http.Server) {
err := healthServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "health server could not listen")
}
klog.V(0).Infoln("Health server stopped listening")
}

func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error {
muxHandler := http.NewServeMux()
muxHandler.Handle("/metrics", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -140,13 +148,19 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error {
ReadHeaderTimeout: ReadHeaderTimeout,
}

go func() {
err := adminServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "admin server could not listen")
}
klog.V(0).Infoln("Admin server stopped listening")
}()
labels := runpprof.Labels(
"core", "adminListener",
"port", strconv.Itoa(o.AdminServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(adminServer) })

return nil
}

func (a *Agent) serveAdmin(adminServer *http.Server) {
err := adminServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "admin server could not listen")
}
klog.V(0).Infoln("Admin server stopped listening")
}
27 changes: 21 additions & 6 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/http"
"os"
"os/signal"
runpprof "runtime/pprof"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -254,7 +255,11 @@ func (c *Client) run(o *GrpcProxyClientOptions) error {
for i := 1; i <= o.testRequests; i++ {
i := i
wg.Add(1)
go func() {
labels := runpprof.Labels(
"core", "testClient",
"testID", strconv.Itoa(i),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
defer wg.Done()
dialer, err := c.getDialer(o)
if err != nil {
Expand Down Expand Up @@ -289,7 +294,7 @@ func (c *Client) run(o *GrpcProxyClientOptions) error {
time.Sleep(wait)
}
ch <- nil
}()
})
}
wg.Wait()
close(ch)
Expand Down Expand Up @@ -404,13 +409,18 @@ func (c *Client) getUDSDialer(o *GrpcProxyClientOptions) (func(ctx context.Conte
ch := make(chan os.Signal, 1)
signal.Notify(ch)

go func() {
labels := runpprof.Labels(
"core", "testUdsDialer",
"requestPath", o.requestPath,
"udsName", o.proxyUdsName,
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
<-ch
if proxyConn != nil {
err := proxyConn.Close()
klog.ErrorS(err, "connection closed")
}
}()
})

switch o.mode {
case "grpc":
Expand Down Expand Up @@ -482,13 +492,18 @@ func (c *Client) getMTLSDialer(o *GrpcProxyClientOptions) (func(ctx context.Cont
ch := make(chan os.Signal, 1)
signal.Notify(ch)

go func() {
labels := runpprof.Labels(
"core", "testMTLSDialer",
"requestPath", o.requestPath,
"requestPort", strconv.Itoa(o.requestPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
<-ch
if proxyConn != nil {
err := proxyConn.Close()
klog.ErrorS(err, "connection closed")
}
}()
})

switch o.mode {
case "grpc":
Expand Down
72 changes: 53 additions & 19 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"io/ioutil"
"net"
"net/http"
"net/http/pprof"
netpprof "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"runtime"
runpprof "runtime/pprof"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -151,16 +152,21 @@ func SetupSignalHandler() (stopCh <-chan struct{}) {
stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()
labels := runpprof.Labels(
"core", "signalHandler",
)
go runpprof.Do(context.Background(), labels, func(context.Context) { handleSignals(c, stop) })

return stop
}

func handleSignals(signalCh chan os.Signal, stopCh chan struct{}) {
<-signalCh
close(stopCh)
<-signalCh
os.Exit(1) // second signal. Exit directly.
}

func getUDSListener(ctx context.Context, udsName string) (net.Listener, error) {
udsListenerLock.Lock()
defer udsListenerLock.Unlock()
Expand Down Expand Up @@ -198,7 +204,11 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
if err != nil {
return nil, fmt.Errorf("failed to get uds listener: %v", err)
}
go grpcServer.Serve(lis)
labels := runpprof.Labels(
"core", "udsGrpcFrontend",
"udsFile", o.UdsName,
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
} else {
// http-connect
Expand All @@ -212,7 +222,11 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
err := server.Shutdown(ctx)
klog.ErrorS(err, "error shutting down server")
}
go func() {
labels := runpprof.Labels(
"core", "udsHttpFrontend",
"udsFile", o.UdsName,
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
udsListener, err := getUDSListener(ctx, o.UdsName)
if err != nil {
klog.ErrorS(err, "failed to get uds listener")
Expand All @@ -225,7 +239,7 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
if err != nil {
klog.ErrorS(err, "failed to serve uds requests")
}
}()
})
}

return stop, nil
Expand Down Expand Up @@ -286,7 +300,11 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %v", addr, err)
}
go grpcServer.Serve(lis)
labels := runpprof.Labels(
"core", "mtlsGrpcFrontend",
"port", strconv.FormatUint(uint64(o.ServerPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
} else {
// http-connect
Expand All @@ -305,12 +323,16 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
klog.ErrorS(err, "failed to shutdown server")
}
}
go func() {
labels := runpprof.Labels(
"core", "mtlsHttpFrontend",
"port", strconv.FormatUint(uint64(o.ServerPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
err := server.ListenAndServeTLS("", "") // empty files defaults to tlsConfig
if err != nil {
klog.ErrorS(err, "failed to listen on frontend port")
}
}()
})
}

return stop, nil
Expand Down Expand Up @@ -338,7 +360,11 @@ func (p *Proxy) runAgentServer(o *options.ProxyRunOptions, server *server.ProxyS
if err != nil {
return fmt.Errorf("failed to listen on %s: %v", addr, err)
}
go grpcServer.Serve(lis)
labels := runpprof.Labels(
"core", "agentListener",
"port", strconv.FormatUint(uint64(o.AgentPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })

return nil
}
Expand All @@ -348,7 +374,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS
muxHandler.Handle("/metrics", promhttp.Handler())
if o.EnableProfiling {
muxHandler.HandleFunc("/debug/pprof", util.RedirectTo("/debug/pprof/"))
muxHandler.HandleFunc("/debug/pprof/", pprof.Index)
muxHandler.HandleFunc("/debug/pprof/", netpprof.Index)
if o.EnableContentionProfiling {
runtime.SetBlockProfileRate(1)
}
Expand All @@ -359,13 +385,17 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS
MaxHeaderBytes: 1 << 20,
}

go func() {
labels := runpprof.Labels(
"core", "adminListener",
"port", strconv.FormatUint(uint64(o.AdminPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
err := adminServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "admin server could not listen")
}
klog.V(1).Infoln("Admin server stopped listening")
}()
})

return nil
}
Expand Down Expand Up @@ -397,13 +427,17 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy
ReadHeaderTimeout: ReadHeaderTimeout,
}

go func() {
labels := runpprof.Labels(
"core", "healthListener",
"port", strconv.FormatUint(uint64(o.HealthPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
err := healthServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "health server could not listen")
}
klog.V(1).Infoln("Health server stopped listening")
}()
})

return nil
}
27 changes: 22 additions & 5 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"net"
"net/url"
runpprof "runtime/pprof"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -430,7 +431,15 @@ func (a *Client) Serve() {
klog.ErrorS(fmt.Errorf("connection is nil"), "cannot send CLOSE_RESP to nil connection")
}
}
go func() {
labels := runpprof.Labels(
"agentID", a.agentID,
"agentIdentifiers", a.agentIdentifiers,
"serverAddress", a.address,
"serverID", a.serverID,
"dialID", strconv.FormatInt(dialReq.Random, 10),
"dialAddress", dialReq.Address,
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
defer close(dialDone)
start := time.Now()
conn, err := net.DialTimeout(dialReq.Protocol, dialReq.Address, dialTimeout)
Expand All @@ -447,19 +456,27 @@ func (a *Client) Serve() {
connCtx.conn = conn
a.connManager.Add(connID, connCtx)
dialResp.GetDialResponse().ConnectID = connID
labels := runpprof.Labels(
"agentID", a.agentID,
"agentIdentifiers", a.agentIdentifiers,
"serverAddress", a.address,
"serverID", a.serverID,
"connectionID", strconv.FormatInt(connID, 10),
"dialAddress", dialReq.Address,
)
if err := a.Send(dialResp); err != nil {
klog.ErrorS(err, "could not send dialResp")
// clean-up is normally called from remoteToProxy which we will never invoke.
// So we are invoking it here to force the clean-up to occur.
// However, cleanup will block until dialDone is closed.
// So placing cleanup on its own goroutine to wait for the deferred close(dialDone) to kick in.
go connCtx.cleanup()
go runpprof.Do(context.Background(), labels, func(context.Context) { connCtx.cleanup() })
return
}
klog.V(3).InfoS("Proxying new connection", "connectionID", connID)
go a.remoteToProxy(connID, connCtx)
go a.proxyToRemote(connID, connCtx)
}()
go runpprof.Do(context.Background(), labels, func(context.Context) { a.remoteToProxy(connID, connCtx) })
go runpprof.Do(context.Background(), labels, func(context.Context) { a.proxyToRemote(connID, connCtx) })
})

case client.PacketType_DATA:
data := pkt.GetData()
Expand Down
Loading

0 comments on commit aedf2cf

Please sign in to comment.