From 7b8da557ea89bde07198ab83faeea2e125b807da Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 21 Feb 2023 12:44:17 -0500 Subject: [PATCH 1/5] agent: plumb contexts through --- cmd/autoscaler-agent/main.go | 9 +++++++-- pkg/agent/entrypoint.go | 6 ++---- pkg/agent/globalstate.go | 4 ++-- pkg/agent/informant.go | 35 ++++++++++++++++++----------------- pkg/informant/endpoints.go | 9 +++++---- pkg/util/handle.go | 5 +++-- 6 files changed, 37 insertions(+), 31 deletions(-) diff --git a/cmd/autoscaler-agent/main.go b/cmd/autoscaler-agent/main.go index 4c1b7ff67..bf22494be 100644 --- a/cmd/autoscaler-agent/main.go +++ b/cmd/autoscaler-agent/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "k8s.io/client-go/kubernetes" scheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -48,8 +50,11 @@ func main() { VMClient: vmClient, } - err = runner.Run() - if err != nil { + // TODO: add a signal handler here? + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err = runner.Run(ctx); err != nil { klog.Fatalf("Main loop failed: %s", err) } klog.Info("Main loop returned without issue. Exiting.") diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 86a7c61d1..4dede6363 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -22,9 +22,7 @@ type MainRunner struct { VMClient *vmclient.Clientset } -func (r MainRunner) Run() error { - ctx := context.Background() - +func (r MainRunner) Run(ctx context.Context) error { podEvents := make(chan podEvent) buildInfo := util.GetBuildInfo() @@ -72,7 +70,7 @@ func (r MainRunner) Run() error { globalState.Stop() return nil case event := <-podEvents: - globalState.handleEvent(event) + globalState.handleEvent(ctx, event) } } } diff --git a/pkg/agent/globalstate.go b/pkg/agent/globalstate.go index 6137c42f9..7e6ecf592 100644 --- a/pkg/agent/globalstate.go +++ b/pkg/agent/globalstate.go @@ -57,7 +57,7 @@ func (s *agentState) Stop() { } } -func (s *agentState) handleEvent(event podEvent) { +func (s *agentState) handleEvent(ctx context.Context, event podEvent) { klog.Infof("Handling pod event %+v", event) state, hasPod := s.pods[event.podName] @@ -77,7 +77,7 @@ func (s *agentState) handleEvent(event podEvent) { return } - runnerCtx, cancelRunnerContext := context.WithCancel(context.TODO()) + runnerCtx, cancelRunnerContext := context.WithCancel(ctx) status := &podStatus{ lock: sync.Mutex{}, diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go index 2bb9bbd9c..782b6743b 100644 --- a/pkg/agent/informant.go +++ b/pkg/agent/informant.go @@ -89,7 +89,7 @@ type InformantServer struct { // exit signals that the server should shut down, and sets exitStatus to status. // // This function MUST be called while holding runner.lock. - exit func(status InformantServerExitStatus) + exit func(ctx context.Context, status InformantServerExitStatus) } type InformantServerMode string @@ -180,7 +180,7 @@ func NewInformantServer( backgroundCtx, cancelBackground := context.WithCancel(ctx) // note: docs for server.exit guarantee this function is called while holding runner.lock. - server.exit = func(status InformantServerExitStatus) { + server.exit = func(ctx context.Context, status InformantServerExitStatus) { sendFinished.Send() cancelBackground() @@ -199,7 +199,7 @@ func NewInformantServer( // we need to spawn these in separate threads so the caller doesn't block while holding // runner.lock - runner.spawnBackgroundWorker(context.TODO(), shutdownName, func(c context.Context) { + runner.spawnBackgroundWorker(ctx, shutdownName, func(c context.Context) { if err := httpServer.Shutdown(c); err != nil { runner.logger.Warningf("Error shutting down InformantServer: %s", err) } @@ -207,7 +207,7 @@ func NewInformantServer( if server.madeContact { // only unregister the server if we could have plausibly contacted the informant unregisterName := fmt.Sprintf("InformantServer unregister (%s)", server.desc.AgentID) - runner.spawnBackgroundWorker(context.TODO(), unregisterName, func(c context.Context) { + runner.spawnBackgroundWorker(ctx, unregisterName, func(c context.Context) { if err := server.unregisterFromInformant(c); err != nil { runner.logger.Warningf("Error unregistering %s: %s", server.desc.AgentID, err) } @@ -382,7 +382,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context) error { // We shouldn't *assume* that restarting will actually fix it though, so we'll still set // RetryShouldFix = false. if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: err, RetryShouldFix: false, }) @@ -441,7 +441,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context) error { err := errors.New("Protocol violation: Informant responded to /register with 200 without requesting /id") s.setLastInformantError(err, true) s.runner.logger.Errorf("%s", err) - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: err, RetryShouldFix: false, }) @@ -614,7 +614,7 @@ func (s *InformantServer) informantURL(path string) string { // of that context. // // Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleID(body *struct{}) (*api.AgentIdentificationMessage, int, error) { +func (s *InformantServer) handleID(ctx context.Context, body *struct{}) (*api.AgentIdentificationMessage, int, error) { s.runner.lock.Lock() defer s.runner.lock.Unlock() @@ -634,7 +634,7 @@ func (s *InformantServer) handleID(body *struct{}) (*api.AgentIdentificationMess // outside of that context. // // Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentificationMessage, int, error) { +func (s *InformantServer) handleResume(ctx context.Context, body *api.ResumeAgent) (*api.AgentIdentificationMessage, int, error) { if body.ExpectedID != s.desc.AgentID { s.runner.logger.Warningf("AgentID %q not found, server has %q", body.ExpectedID, s.desc.AgentID) return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID) @@ -662,7 +662,7 @@ func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentif s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -673,7 +673,7 @@ func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentif s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -693,7 +693,7 @@ func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentif // called outside of that context. // // Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdentificationMessage, int, error) { +func (s *InformantServer) handleSuspend(ctx context.Context, body *api.SuspendAgent) (*api.AgentIdentificationMessage, int, error) { if body.ExpectedID != s.desc.AgentID { s.runner.logger.Warningf("AgentID %q not found, server has %q", body.ExpectedID, s.desc.AgentID) return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID) @@ -717,7 +717,7 @@ func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdent s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -728,7 +728,7 @@ func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdent s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -747,6 +747,7 @@ func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdent // // Returns: response body (if successful), status code, error (if unsuccessful) func (s *InformantServer) handleTryUpscale( + ctx context.Context, body *api.MoreResourcesRequest, ) (*api.AgentIdentificationMessage, int, error) { if body.ExpectedID != s.desc.AgentID { @@ -789,7 +790,7 @@ func (s *InformantServer) handleTryUpscale( s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -800,7 +801,7 @@ func (s *InformantServer) handleTryUpscale( s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -843,7 +844,7 @@ func (s *InformantServer) Downscale(ctx context.Context, to api.Resources) (*api s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true) if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: err, RetryShouldFix: statusCode == 404, }) @@ -885,7 +886,7 @@ func (s *InformantServer) Upscale(ctx context.Context, to api.Resources) error { s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true) if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ + s.exit(ctx, InformantServerExitStatus{ Err: err, RetryShouldFix: statusCode == 404, }) diff --git a/pkg/informant/endpoints.go b/pkg/informant/endpoints.go index 602011339..42e593408 100644 --- a/pkg/informant/endpoints.go +++ b/pkg/informant/endpoints.go @@ -3,6 +3,7 @@ package informant // This file contains the high-level handlers for various HTTP endpoints import ( + "context" "errors" "fmt" "sync" @@ -95,7 +96,7 @@ func WithCgroup(cgm *CgroupManager, config CgroupConfig) NewStateOpts { // RegisterAgent registers a new or updated autoscaler-agent // // Returns: body (if successful), status code, error (if unsuccessful) -func (s *State) RegisterAgent(info *api.AgentDesc) (*api.InformantDesc, int, error) { +func (s *State) RegisterAgent(ctx context.Context, info *api.AgentDesc) (*api.InformantDesc, int, error) { protoVersion, status, err := s.agents.RegisterNewAgent(info) if err != nil { return nil, status, err @@ -115,7 +116,7 @@ func (s *State) RegisterAgent(info *api.AgentDesc) (*api.InformantDesc, int, err // amount is ok // // Returns: body (if successful), status code and error (if unsuccessful) -func (s *State) TryDownscale(target *api.RawResources) (*api.DownscaleResult, int, error) { +func (s *State) TryDownscale(ctx context.Context, target *api.RawResources) (*api.DownscaleResult, int, error) { // If we aren't interacting with a cgroup, then we don't need to do anything. if s.cgroup == nil { return &api.DownscaleResult{Ok: true, Status: "No action taken (no cgroup enabled)"}, 200, nil @@ -173,7 +174,7 @@ func (s *State) TryDownscale(target *api.RawResources) (*api.DownscaleResult, in // NotifyUpscale signals that the VM's resource usage has been increased to the new amount // // Returns: body (if successful), status code and error (if unsuccessful) -func (s *State) NotifyUpscale(newResources *api.RawResources) (*struct{}, int, error) { +func (s *State) NotifyUpscale(ctx context.Context, newResources *api.RawResources) (*struct{}, int, error) { // FIXME: we shouldn't just trust what the agent says // // Because of race conditions like in , @@ -216,7 +217,7 @@ func (s *State) NotifyUpscale(newResources *api.RawResources) (*struct{}, int, e // If a different autoscaler-agent is currently registered, this method will do nothing. // // Returns: body (if successful), status code and error (if unsuccessful) -func (s *State) UnregisterAgent(info *api.AgentDesc) (*api.UnregisterAgent, int, error) { +func (s *State) UnregisterAgent(ctx context.Context, info *api.AgentDesc) (*api.UnregisterAgent, int, error) { agent, ok := s.agents.Get(info.AgentID) if !ok { return nil, 404, fmt.Errorf("No agent with ID %q", info.AgentID) diff --git a/pkg/util/handle.go b/pkg/util/handle.go index da8e1eb2b..5629e2e06 100644 --- a/pkg/util/handle.go +++ b/pkg/util/handle.go @@ -3,6 +3,7 @@ package util // Wrapper file for the AddHandler function import ( + "context" "encoding/json" "errors" "fmt" @@ -22,7 +23,7 @@ func AddHandler[T any, R any]( endpoint string, method string, reqTypeName string, - handle func(*T) (_ *R, statusCode int, _ error), + handle func(context.Context, *T) (_ *R, statusCode int, _ error), ) { errBadMethod := []byte("request method must be " + method) @@ -44,7 +45,7 @@ func AddHandler[T any, R any]( klog.Infof("%sReceived request on %s (client = %s) %+v", logPrefix, endpoint, r.RemoteAddr, req) - resp, status, err := handle(&req) + resp, status, err := handle(r.Context(), &req) if err == nil && status != http.StatusOK { err = errors.New("HTTP handler error: status != 200 OK, but no error message") From 48eecba224390e5010750047f50ee8a52ec3810b Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 21 Feb 2023 15:53:06 -0500 Subject: [PATCH 2/5] fixup --- pkg/agent/informant.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go index 782b6743b..7035f744c 100644 --- a/pkg/agent/informant.go +++ b/pkg/agent/informant.go @@ -199,16 +199,26 @@ func NewInformantServer( // we need to spawn these in separate threads so the caller doesn't block while holding // runner.lock - runner.spawnBackgroundWorker(ctx, shutdownName, func(c context.Context) { - if err := httpServer.Shutdown(c); err != nil { + runner.spawnBackgroundWorker(ctx, shutdownName, func(context.Context) { + // we want shutdown to (potentially) live longer than the request which + // made it, but having a timeout is still good. + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + if err := httpServer.Shutdown(ctx); err != nil { runner.logger.Warningf("Error shutting down InformantServer: %s", err) } }) if server.madeContact { // only unregister the server if we could have plausibly contacted the informant unregisterName := fmt.Sprintf("InformantServer unregister (%s)", server.desc.AgentID) - runner.spawnBackgroundWorker(ctx, unregisterName, func(c context.Context) { - if err := server.unregisterFromInformant(c); err != nil { + runner.spawnBackgroundWorker(ctx, unregisterName, func(context.Context) { + // we want shutdown to (potentially) live longer than the request which + // made it, but having a timeout is still good. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.unregisterFromInformant(ctx); err != nil { runner.logger.Warningf("Error unregistering %s: %s", server.desc.AgentID, err) } }) From 263296002cdd57b8e5b0a2e5448765d7c9c99610 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 24 Feb 2023 09:48:45 -0500 Subject: [PATCH 3/5] Update cmd/autoscaler-agent/main.go Co-authored-by: sharnoff --- cmd/autoscaler-agent/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/autoscaler-agent/main.go b/cmd/autoscaler-agent/main.go index bf22494be..4da7f7798 100644 --- a/cmd/autoscaler-agent/main.go +++ b/cmd/autoscaler-agent/main.go @@ -50,7 +50,7 @@ func main() { VMClient: vmClient, } - // TODO: add a signal handler here? + // TODO: add a signal handler here ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 1381579fcaacf2e3029a3bdb7a9114f6c7fb1fcd Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 24 Feb 2023 08:50:58 -0500 Subject: [PATCH 4/5] cr1 --- pkg/agent/informant.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go index 7035f744c..1649a4807 100644 --- a/pkg/agent/informant.go +++ b/pkg/agent/informant.go @@ -89,7 +89,7 @@ type InformantServer struct { // exit signals that the server should shut down, and sets exitStatus to status. // // This function MUST be called while holding runner.lock. - exit func(ctx context.Context, status InformantServerExitStatus) + exit func(status InformantServerExitStatus) } type InformantServerMode string @@ -180,7 +180,7 @@ func NewInformantServer( backgroundCtx, cancelBackground := context.WithCancel(ctx) // note: docs for server.exit guarantee this function is called while holding runner.lock. - server.exit = func(ctx context.Context, status InformantServerExitStatus) { + server.exit = func(status InformantServerExitStatus) { sendFinished.Send() cancelBackground() @@ -392,7 +392,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context) error { // We shouldn't *assume* that restarting will actually fix it though, so we'll still set // RetryShouldFix = false. if 400 <= statusCode && statusCode <= 599 { - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: err, RetryShouldFix: false, }) @@ -451,7 +451,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context) error { err := errors.New("Protocol violation: Informant responded to /register with 200 without requesting /id") s.setLastInformantError(err, true) s.runner.logger.Errorf("%s", err) - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: err, RetryShouldFix: false, }) @@ -672,7 +672,7 @@ func (s *InformantServer) handleResume(ctx context.Context, body *api.ResumeAgen s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -683,7 +683,7 @@ func (s *InformantServer) handleResume(ctx context.Context, body *api.ResumeAgen s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -727,7 +727,7 @@ func (s *InformantServer) handleSuspend(ctx context.Context, body *api.SuspendAg s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -738,7 +738,7 @@ func (s *InformantServer) handleSuspend(ctx context.Context, body *api.SuspendAg s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -800,7 +800,7 @@ func (s *InformantServer) handleTryUpscale( s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -811,7 +811,7 @@ func (s *InformantServer) handleTryUpscale( s.runner.logger.Warningf("%s", internalErr) // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: internalErr, RetryShouldFix: true, }) @@ -854,7 +854,7 @@ func (s *InformantServer) Downscale(ctx context.Context, to api.Resources) (*api s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true) if 400 <= statusCode && statusCode <= 599 { - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: err, RetryShouldFix: statusCode == 404, }) @@ -896,7 +896,7 @@ func (s *InformantServer) Upscale(ctx context.Context, to api.Resources) error { s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true) if 400 <= statusCode && statusCode <= 599 { - s.exit(ctx, InformantServerExitStatus{ + s.exit(InformantServerExitStatus{ Err: err, RetryShouldFix: statusCode == 404, }) From 0d19832b8e2d797436acb0880437bf4cf8e12b25 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 28 Feb 2023 22:15:28 -0500 Subject: [PATCH 5/5] cr~ish --- cmd/autoscaler-agent/main.go | 9 +++++++-- go.mod | 2 +- go.sum | 2 ++ pkg/agent/informant.go | 7 ++++--- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cmd/autoscaler-agent/main.go b/cmd/autoscaler-agent/main.go index 4da7f7798..9f6c27d67 100644 --- a/cmd/autoscaler-agent/main.go +++ b/cmd/autoscaler-agent/main.go @@ -2,6 +2,10 @@ package main import ( "context" + "os/signal" + "syscall" + + "github.com/tychoish/fun/srv" "k8s.io/client-go/kubernetes" scheme "k8s.io/client-go/kubernetes/scheme" @@ -50,9 +54,10 @@ func main() { VMClient: vmClient, } - // TODO: add a signal handler here - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM) defer cancel() + ctx = srv.SetShutdown(ctx) + ctx = srv.SetBaseContext(ctx) if err = runner.Run(ctx); err != nil { klog.Fatalf("Main loop failed: %s", err) diff --git a/go.mod b/go.mod index c1f05fd32..3d3e91061 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/elastic/go-sysinfo v1.9.0 github.com/google/uuid v1.3.0 github.com/neondatabase/neonvm v0.4.5 - github.com/tychoish/fun v0.3.3 + github.com/tychoish/fun v0.7.0 golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 k8s.io/api v0.23.15 k8s.io/apimachinery v0.23.15 diff --git a/go.sum b/go.sum index 71770f182..73924d1e2 100644 --- a/go.sum +++ b/go.sum @@ -654,6 +654,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tychoish/fun v0.3.3 h1:oyptuZ9SL8vzPamERbZA2PnG7VMFyMBimVlJXn9rgas= github.com/tychoish/fun v0.3.3/go.mod h1:84A+BwGecz23UotmbB4mtvVS5ZcsZpspecduxpwF/XM= +github.com/tychoish/fun v0.7.0 h1:7bT+TGFBZfhqfVjjyqhj7ze2GelUH+hZL3FYnL7x/dQ= +github.com/tychoish/fun v0.7.0/go.mod h1:84A+BwGecz23UotmbB4mtvVS5ZcsZpspecduxpwF/XM= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go index 1649a4807..f67b43545 100644 --- a/pkg/agent/informant.go +++ b/pkg/agent/informant.go @@ -13,6 +13,7 @@ import ( "time" "github.com/google/uuid" + "github.com/tychoish/fun/srv" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -199,10 +200,10 @@ func NewInformantServer( // we need to spawn these in separate threads so the caller doesn't block while holding // runner.lock - runner.spawnBackgroundWorker(ctx, shutdownName, func(context.Context) { + runner.spawnBackgroundWorker(srv.GetBaseContext(ctx), shutdownName, func(context.Context) { // we want shutdown to (potentially) live longer than the request which // made it, but having a timeout is still good. - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := httpServer.Shutdown(ctx); err != nil { @@ -212,7 +213,7 @@ func NewInformantServer( if server.madeContact { // only unregister the server if we could have plausibly contacted the informant unregisterName := fmt.Sprintf("InformantServer unregister (%s)", server.desc.AgentID) - runner.spawnBackgroundWorker(ctx, unregisterName, func(context.Context) { + runner.spawnBackgroundWorker(srv.GetBaseContext(ctx), unregisterName, func(context.Context) { // we want shutdown to (potentially) live longer than the request which // made it, but having a timeout is still good. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)