Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: plumb contexts through #59

Merged
merged 6 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/autoscaler-agent/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -48,8 +50,11 @@ func main() {
VMClient: vmClient,
}

err = runner.Run()
if err != nil {
// TODO: add a signal handler here?
tychoish marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err = runner.Run(ctx); err != nil {
klog.Fatalf("Main loop failed: %s", err)
}
klog.Info("Main loop returned without issue. Exiting.")
Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ type MainRunner struct {
VMClient *vmclient.Clientset
}

func (r MainRunner) Run() error {
ctx := context.Background()

func (r MainRunner) Run(ctx context.Context) error {
podEvents := make(chan podEvent)

buildInfo := util.GetBuildInfo()
Expand Down Expand Up @@ -72,7 +70,7 @@ func (r MainRunner) Run() error {
globalState.Stop()
return nil
case event := <-podEvents:
globalState.handleEvent(event)
globalState.handleEvent(ctx, event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels icky (handleEvent will spawn threads using the context long after handleEvent finishes, and we're always using the same context for handleEvent) - but the only better solution is storing the context in globalstate itself.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any problem with passing a context to a function that returns before the goroutines it spawns (and indeed having this context means that shutting down the agents main loop will actually cause a shutdown. Eventually/soon the basecontext/shutdown stuff can/will help make some of this more manageable.

}
}
}
4 changes: 2 additions & 2 deletions pkg/agent/globalstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *agentState) Stop() {
}
}

func (s *agentState) handleEvent(event podEvent) {
func (s *agentState) handleEvent(ctx context.Context, event podEvent) {
klog.Infof("Handling pod event %+v", event)

state, hasPod := s.pods[event.podName]
Expand All @@ -77,7 +77,7 @@ func (s *agentState) handleEvent(event podEvent) {
return
}

runnerCtx, cancelRunnerContext := context.WithCancel(context.TODO())
runnerCtx, cancelRunnerContext := context.WithCancel(ctx)

status := &podStatus{
lock: sync.Mutex{},
Expand Down
49 changes: 30 additions & 19 deletions pkg/agent/informant.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type InformantServer struct {
// exit signals that the server should shut down, and sets exitStatus to status.
//
// This function MUST be called while holding runner.lock.
exit func(status InformantServerExitStatus)
exit func(ctx context.Context, status InformantServerExitStatus)
}

type InformantServerMode string
Expand Down Expand Up @@ -180,7 +180,7 @@ func NewInformantServer(
backgroundCtx, cancelBackground := context.WithCancel(ctx)

// note: docs for server.exit guarantee this function is called while holding runner.lock.
server.exit = func(status InformantServerExitStatus) {
server.exit = func(ctx context.Context, status InformantServerExitStatus) {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
sendFinished.Send()
cancelBackground()

Expand All @@ -199,16 +199,26 @@ func NewInformantServer(

// we need to spawn these in separate threads so the caller doesn't block while holding
// runner.lock
runner.spawnBackgroundWorker(context.TODO(), shutdownName, func(c context.Context) {
if err := httpServer.Shutdown(c); err != nil {
runner.spawnBackgroundWorker(ctx, shutdownName, func(context.Context) {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
// we want shutdown to (potentially) live longer than the request which
// made it, but having a timeout is still good.
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 seconds is a super long timeout, compared to other things here (or: compared to our usual configuration of them). I'd either: make this shorter (e.g. 5s), add a config field for it, or calculate it from an existing config field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I think the question is more "how long could an HTTP request handler take to do a thing in normal operation and double(ish) it.

The cap on this is about 30s in my mind (which is probably just what the timeout handler was in System V init scripts between sigterm and sigkill if the process doesn't die, and which has definitely been carried further into the future.

defer cancel()

if err := httpServer.Shutdown(ctx); err != nil {
runner.logger.Warningf("Error shutting down InformantServer: %s", err)
}
})
if server.madeContact {
// only unregister the server if we could have plausibly contacted the informant
unregisterName := fmt.Sprintf("InformantServer unregister (%s)", server.desc.AgentID)
runner.spawnBackgroundWorker(context.TODO(), unregisterName, func(c context.Context) {
if err := server.unregisterFromInformant(c); err != nil {
runner.spawnBackgroundWorker(ctx, unregisterName, func(context.Context) {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
// we want shutdown to (potentially) live longer than the request which
// made it, but having a timeout is still good.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := server.unregisterFromInformant(ctx); err != nil {
Comment on lines +217 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unregisterFromInformant already uses a timeout on the request itself; do we need another one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that doInformantRequest takes a timeout and a context is a bit of a weird API, and I kind of planned to pull that apart in a later PR but I don't feel rushed about that..

it's plausible that we could just pass the the enclosing context to the unregister call, and not worry about it during shutdown, but.

runner.logger.Warningf("Error unregistering %s: %s", server.desc.AgentID, err)
}
})
Expand Down Expand Up @@ -382,7 +392,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context) error {
// We shouldn't *assume* that restarting will actually fix it though, so we'll still set
// RetryShouldFix = false.
if 400 <= statusCode && statusCode <= 599 {
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: err,
RetryShouldFix: false,
})
Expand Down Expand Up @@ -441,7 +451,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context) error {
err := errors.New("Protocol violation: Informant responded to /register with 200 without requesting /id")
s.setLastInformantError(err, true)
s.runner.logger.Errorf("%s", err)
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: err,
RetryShouldFix: false,
})
Expand Down Expand Up @@ -614,7 +624,7 @@ func (s *InformantServer) informantURL(path string) string {
// of that context.
//
// Returns: response body (if successful), status code, error (if unsuccessful)
func (s *InformantServer) handleID(body *struct{}) (*api.AgentIdentificationMessage, int, error) {
func (s *InformantServer) handleID(ctx context.Context, body *struct{}) (*api.AgentIdentificationMessage, int, error) {
s.runner.lock.Lock()
defer s.runner.lock.Unlock()

Expand All @@ -634,7 +644,7 @@ func (s *InformantServer) handleID(body *struct{}) (*api.AgentIdentificationMess
// outside of that context.
//
// Returns: response body (if successful), status code, error (if unsuccessful)
func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentificationMessage, int, error) {
func (s *InformantServer) handleResume(ctx context.Context, body *api.ResumeAgent) (*api.AgentIdentificationMessage, int, error) {
if body.ExpectedID != s.desc.AgentID {
s.runner.logger.Warningf("AgentID %q not found, server has %q", body.ExpectedID, s.desc.AgentID)
return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID)
Expand Down Expand Up @@ -662,7 +672,7 @@ func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentif
s.runner.logger.Warningf("%s", internalErr)

// To be nice, we'll restart the server. We don't want to make a temporary error permanent.
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: internalErr,
RetryShouldFix: true,
})
Expand All @@ -673,7 +683,7 @@ func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentif
s.runner.logger.Warningf("%s", internalErr)

// To be nice, we'll restart the server. We don't want to make a temporary error permanent.
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: internalErr,
RetryShouldFix: true,
})
Expand All @@ -693,7 +703,7 @@ func (s *InformantServer) handleResume(body *api.ResumeAgent) (*api.AgentIdentif
// called outside of that context.
//
// Returns: response body (if successful), status code, error (if unsuccessful)
func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdentificationMessage, int, error) {
func (s *InformantServer) handleSuspend(ctx context.Context, body *api.SuspendAgent) (*api.AgentIdentificationMessage, int, error) {
if body.ExpectedID != s.desc.AgentID {
s.runner.logger.Warningf("AgentID %q not found, server has %q", body.ExpectedID, s.desc.AgentID)
return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID)
Expand All @@ -717,7 +727,7 @@ func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdent
s.runner.logger.Warningf("%s", internalErr)

// To be nice, we'll restart the server. We don't want to make a temporary error permanent.
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: internalErr,
RetryShouldFix: true,
})
Expand All @@ -728,7 +738,7 @@ func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdent
s.runner.logger.Warningf("%s", internalErr)

// To be nice, we'll restart the server. We don't want to make a temporary error permanent.
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: internalErr,
RetryShouldFix: true,
})
Expand All @@ -747,6 +757,7 @@ func (s *InformantServer) handleSuspend(body *api.SuspendAgent) (*api.AgentIdent
//
// Returns: response body (if successful), status code, error (if unsuccessful)
func (s *InformantServer) handleTryUpscale(
ctx context.Context,
body *api.MoreResourcesRequest,
) (*api.AgentIdentificationMessage, int, error) {
if body.ExpectedID != s.desc.AgentID {
Expand Down Expand Up @@ -789,7 +800,7 @@ func (s *InformantServer) handleTryUpscale(
s.runner.logger.Warningf("%s", internalErr)

// To be nice, we'll restart the server. We don't want to make a temporary error permanent.
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: internalErr,
RetryShouldFix: true,
})
Expand All @@ -800,7 +811,7 @@ func (s *InformantServer) handleTryUpscale(
s.runner.logger.Warningf("%s", internalErr)

// To be nice, we'll restart the server. We don't want to make a temporary error permanent.
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: internalErr,
RetryShouldFix: true,
})
Expand Down Expand Up @@ -843,7 +854,7 @@ func (s *InformantServer) Downscale(ctx context.Context, to api.Resources) (*api
s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true)

if 400 <= statusCode && statusCode <= 599 {
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: err,
RetryShouldFix: statusCode == 404,
})
Expand Down Expand Up @@ -885,7 +896,7 @@ func (s *InformantServer) Upscale(ctx context.Context, to api.Resources) error {
s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true)

if 400 <= statusCode && statusCode <= 599 {
s.exit(InformantServerExitStatus{
s.exit(ctx, InformantServerExitStatus{
Err: err,
RetryShouldFix: statusCode == 404,
})
Expand Down
9 changes: 5 additions & 4 deletions pkg/informant/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package informant
// This file contains the high-level handlers for various HTTP endpoints

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -95,7 +96,7 @@ func WithCgroup(cgm *CgroupManager, config CgroupConfig) NewStateOpts {
// RegisterAgent registers a new or updated autoscaler-agent
//
// Returns: body (if successful), status code, error (if unsuccessful)
func (s *State) RegisterAgent(info *api.AgentDesc) (*api.InformantDesc, int, error) {
func (s *State) RegisterAgent(ctx context.Context, info *api.AgentDesc) (*api.InformantDesc, int, error) {
protoVersion, status, err := s.agents.RegisterNewAgent(info)
if err != nil {
return nil, status, err
Expand All @@ -115,7 +116,7 @@ func (s *State) RegisterAgent(info *api.AgentDesc) (*api.InformantDesc, int, err
// amount is ok
//
// Returns: body (if successful), status code and error (if unsuccessful)
func (s *State) TryDownscale(target *api.RawResources) (*api.DownscaleResult, int, error) {
func (s *State) TryDownscale(ctx context.Context, target *api.RawResources) (*api.DownscaleResult, int, error) {
// If we aren't interacting with a cgroup, then we don't need to do anything.
if s.cgroup == nil {
return &api.DownscaleResult{Ok: true, Status: "No action taken (no cgroup enabled)"}, 200, nil
Expand Down Expand Up @@ -173,7 +174,7 @@ func (s *State) TryDownscale(target *api.RawResources) (*api.DownscaleResult, in
// NotifyUpscale signals that the VM's resource usage has been increased to the new amount
//
// Returns: body (if successful), status code and error (if unsuccessful)
func (s *State) NotifyUpscale(newResources *api.RawResources) (*struct{}, int, error) {
func (s *State) NotifyUpscale(ctx context.Context, newResources *api.RawResources) (*struct{}, int, error) {
// FIXME: we shouldn't just trust what the agent says
//
// Because of race conditions like in <https://github.com/neondatabase/autoscaling/issues/23>,
Expand Down Expand Up @@ -216,7 +217,7 @@ func (s *State) NotifyUpscale(newResources *api.RawResources) (*struct{}, int, e
// If a different autoscaler-agent is currently registered, this method will do nothing.
//
// Returns: body (if successful), status code and error (if unsuccessful)
func (s *State) UnregisterAgent(info *api.AgentDesc) (*api.UnregisterAgent, int, error) {
func (s *State) UnregisterAgent(ctx context.Context, info *api.AgentDesc) (*api.UnregisterAgent, int, error) {
agent, ok := s.agents.Get(info.AgentID)
if !ok {
return nil, 404, fmt.Errorf("No agent with ID %q", info.AgentID)
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
// Wrapper file for the AddHandler function

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -22,7 +23,7 @@ func AddHandler[T any, R any](
endpoint string,
method string,
reqTypeName string,
handle func(*T) (_ *R, statusCode int, _ error),
handle func(context.Context, *T) (_ *R, statusCode int, _ error),
) {
errBadMethod := []byte("request method must be " + method)

Expand All @@ -44,7 +45,7 @@ func AddHandler[T any, R any](

klog.Infof("%sReceived request on %s (client = %s) %+v", logPrefix, endpoint, r.RemoteAddr, req)

resp, status, err := handle(&req)
resp, status, err := handle(r.Context(), &req)

if err == nil && status != http.StatusOK {
err = errors.New("HTTP handler error: status != 200 OK, but no error message")
Expand Down