From 0faed631e467b4a25744a091892940513045aec4 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Mon, 3 Jul 2023 07:40:48 -0700 Subject: [PATCH] agent: Rewrite scaling logic into pkg/agent/{core,executor} At a very high level, this work replaces (*Runner).handleVMResources(), moving from an imperative style to an explicit state machine. This new version is more complicated, but ultimately more flexible and easier to extend. The decision-making "core" of the scaling logic is implemented by (*core.State).NextActions(), which returns an ActionSet indicating what the "caller" should do. NextActions() is a pure function, making this easier to test - at least, in theory. That method is called and cached by executor.ExecutorCore, where there's a few different threads (each defined in exec_*.go) responsible for implementing the communications with the other components - namely, the scheduler plugin, vm-informant, and NeonVM k8s API. The various "executor" threads are written generically, using dedicated interfaces (e.g. PluginInterface / PluginHandle) that are implemented in pkg/agent/execbridge.go. --- .golangci.yml | 8 + deploy/agent/config_map.yaml | 3 + pkg/agent/config.go | 48 +- pkg/agent/core/action.go | 40 + pkg/agent/core/dumpstate.go | 142 ++++ pkg/agent/core/state.go | 710 +++++++++++++++++ pkg/agent/execbridge.go | 200 +++++ pkg/agent/executor/core.go | 155 ++++ pkg/agent/executor/exec_informant.go | 266 +++++++ pkg/agent/executor/exec_neonvm.go | 94 +++ pkg/agent/executor/exec_plugin.go | 138 ++++ pkg/agent/executor/exec_sleeper.go | 68 ++ pkg/agent/globalstate.go | 17 +- pkg/agent/informant.go | 164 ++-- pkg/agent/runner.go | 1052 +++----------------------- pkg/plugin/config.go | 2 + pkg/util/broadcast.go | 72 ++ pkg/util/watch/watch.go | 2 + 18 files changed, 2136 insertions(+), 1045 deletions(-) create mode 100644 pkg/agent/core/action.go create mode 100644 pkg/agent/core/dumpstate.go create mode 100644 pkg/agent/core/state.go create mode 100644 pkg/agent/execbridge.go create mode 100644 pkg/agent/executor/core.go create mode 100644 pkg/agent/executor/exec_informant.go create mode 100644 pkg/agent/executor/exec_neonvm.go create mode 100644 pkg/agent/executor/exec_plugin.go create mode 100644 pkg/agent/executor/exec_sleeper.go create mode 100644 pkg/util/broadcast.go diff --git a/.golangci.yml b/.golangci.yml index b6b093edc..40fa640ae 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -6,6 +6,11 @@ run: skip-dirs: - neonvm +issues: + exclude: + # ChanMutex contains only a channel, which *is* safe to copy + - 'copylocks: return copies lock value: github\.com/neondatabase/autoscaling/pkg/util\.ChanMutex' + output: format: colored-line-number print-issued-lines: true @@ -56,8 +61,11 @@ linters-settings: - '^github.com/prometheus/client_golang/prometheus(/.*)?\.\w+Opts$' - '^github\.com/containerd/cgroups/v3/cgroup2\.(Resources|Memory)' - '^github\.com/tychoish/fun/pubsub\.BrokerOptions$' + - '^github\.com/neondatabase/autoscaling/pkg/util\.JSONPatch$' + - '^github\.com/neondatabase/autoscaling/pkg/util/watch\.HandlerFuncs$' # vmapi.{VirtualMachine,VirtualMachineSpec,VirtualMachineMigration,VirtualMachineMigrationSpec} - '^github\.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1\.VirtualMachine(Migration)?(Spec)?$' + - '^github\.com/neondatabase/autoscaling/pkg/agent/core\.Action$' # see: gci: diff --git a/deploy/agent/config_map.yaml b/deploy/agent/config_map.yaml index 125f0d0aa..280f8d2b5 100644 --- a/deploy/agent/config_map.yaml +++ b/deploy/agent/config_map.yaml @@ -16,6 +16,8 @@ data: "serverPort": 10301, "retryServerMinWaitSeconds": 5, "retryServerNormalWaitSeconds": 5, + "retryDeniedDownscaleSeconds": 5, + "retryFailedRequestSeconds": 3, "registerRetrySeconds": 5, "requestTimeoutSeconds": 1, "registerTimeoutSeconds": 2, @@ -31,6 +33,7 @@ data: "scheduler": { "schedulerName": "autoscale-scheduler", "requestTimeoutSeconds": 2, + "requestAtLeastEverySeconds": 5, "requestPort": 10299 }, "dumpState": { diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 7ac480782..8ab7d0329 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -12,12 +12,12 @@ import ( ) type Config struct { + DumpState *DumpStateConfig `json:"dumpState"` Scaling ScalingConfig `json:"scaling"` Informant InformantConfig `json:"informant"` Metrics MetricsConfig `json:"metrics"` Scheduler SchedulerConfig `json:"scheduler"` Billing *billing.Config `json:"billing,omitempty"` - DumpState *DumpStateConfig `json:"dumpState"` } // DumpStateConfig configures the endpoint to dump all internal state @@ -54,6 +54,13 @@ type InformantConfig struct { // register request. RegisterRetrySeconds uint `json:"registerRetrySeconds"` + // RetryFailedRequestSeconds gives the duration, in seconds, that we must wait before retrying a + // request that previously failed. + RetryFailedRequestSeconds uint `json:"retryFailedRequestSeconds"` + // RetryDeniedDownscaleSeconds gives the duration, in seconds, that we must wait before retrying + // a downscale request that was previously denied + RetryDeniedDownscaleSeconds uint `json:"retryDeniedDownscaleSeconds"` + // RequestTimeoutSeconds gives the timeout for any individual request to the informant, except // for those with separately-defined values below. RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` @@ -98,6 +105,9 @@ type SchedulerConfig struct { // // If zero, requests will have no timeout. RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` + // RequestAtLeastEverySeconds gives the maximum duration we should go without attempting a + // request to the scheduler, even if nothing's changed. + RequestAtLeastEverySeconds uint `json:"requestAtLeastEverySeconds"` // RequestPort defines the port to access the scheduler's ✨special✨ API with RequestPort uint16 `json:"requestPort"` } @@ -131,31 +141,35 @@ func (c *Config) validate() error { zeroTmpl = "field %q cannot be zero" ) - erc.Whenf(ec, c.Billing != nil && c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName") - erc.Whenf(ec, c.Billing != nil && c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName") - erc.Whenf(ec, c.Billing != nil && c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds") - erc.Whenf(ec, c.Billing != nil && c.Billing.PushEverySeconds == 0, zeroTmpl, ".billing.pushEverySeconds") - erc.Whenf(ec, c.Billing != nil && c.Billing.PushTimeoutSeconds == 0, zeroTmpl, ".billing.pushTimeoutSeconds") - erc.Whenf(ec, c.Billing != nil && c.Billing.URL == "", emptyTmpl, ".billing.url") erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port") erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds") - erc.Whenf(ec, c.Informant.DownscaleTimeoutSeconds == 0, zeroTmpl, ".informant.downscaleTimeoutSeconds") - erc.Whenf(ec, c.Informant.RegisterRetrySeconds == 0, zeroTmpl, ".informant.registerRetrySeconds") - erc.Whenf(ec, c.Informant.RegisterTimeoutSeconds == 0, zeroTmpl, ".informant.registerTimeoutSeconds") - erc.Whenf(ec, c.Informant.RequestTimeoutSeconds == 0, zeroTmpl, ".informant.requestTimeoutSeconds") + erc.Whenf(ec, c.Scaling.RequestTimeoutSeconds == 0, zeroTmpl, ".scaling.requestTimeoutSeconds") + // add all errors if there are any: https://github.com/neondatabase/autoscaling/pull/195#discussion_r1170893494 + ec.Add(c.Scaling.DefaultConfig.Validate()) + erc.Whenf(ec, c.Informant.ServerPort == 0, zeroTmpl, ".informant.serverPort") erc.Whenf(ec, c.Informant.RetryServerMinWaitSeconds == 0, zeroTmpl, ".informant.retryServerMinWaitSeconds") erc.Whenf(ec, c.Informant.RetryServerNormalWaitSeconds == 0, zeroTmpl, ".informant.retryServerNormalWaitSeconds") - erc.Whenf(ec, c.Informant.ServerPort == 0, zeroTmpl, ".informant.serverPort") + erc.Whenf(ec, c.Informant.RegisterRetrySeconds == 0, zeroTmpl, ".informant.registerRetrySeconds") + erc.Whenf(ec, c.Informant.RetryFailedRequestSeconds == 0, zeroTmpl, ".informant.retryFailedRequestSeconds") + erc.Whenf(ec, c.Informant.RetryDeniedDownscaleSeconds == 0, zeroTmpl, ".informant.retryDeniedDownscaleSeconds") + erc.Whenf(ec, c.Informant.RequestTimeoutSeconds == 0, zeroTmpl, ".informant.requestTimeoutSeconds") + erc.Whenf(ec, c.Informant.RegisterTimeoutSeconds == 0, zeroTmpl, ".informant.registerTimeoutSeconds") + erc.Whenf(ec, c.Informant.DownscaleTimeoutSeconds == 0, zeroTmpl, ".informant.downscaleTimeoutSeconds") erc.Whenf(ec, c.Informant.UnhealthyAfterSilenceDurationSeconds == 0, zeroTmpl, ".informant.unhealthyAfterSilenceDurationSeconds") erc.Whenf(ec, c.Informant.UnhealthyStartupGracePeriodSeconds == 0, zeroTmpl, ".informant.unhealthyStartupGracePeriodSeconds") erc.Whenf(ec, c.Metrics.LoadMetricPrefix == "", emptyTmpl, ".metrics.loadMetricPrefix") + erc.Whenf(ec, c.Metrics.RequestTimeoutSeconds == 0, zeroTmpl, ".metrics.requestTimeoutSeconds") erc.Whenf(ec, c.Metrics.SecondsBetweenRequests == 0, zeroTmpl, ".metrics.secondsBetweenRequests") - erc.Whenf(ec, c.Scaling.RequestTimeoutSeconds == 0, zeroTmpl, ".scaling.requestTimeoutSeconds") - // add all errors if there are any: https://github.com/neondatabase/autoscaling/pull/195#discussion_r1170893494 - ec.Add(c.Scaling.DefaultConfig.Validate()) - erc.Whenf(ec, c.Scheduler.RequestPort == 0, zeroTmpl, ".scheduler.requestPort") - erc.Whenf(ec, c.Scheduler.RequestTimeoutSeconds == 0, zeroTmpl, ".scheduler.requestTimeoutSeconds") erc.Whenf(ec, c.Scheduler.SchedulerName == "", emptyTmpl, ".scheduler.schedulerName") + // note: c.Scheduler.RequestTimeoutSeconds == 0 is valid + erc.Whenf(ec, c.Scheduler.RequestAtLeastEverySeconds == 0, zeroTmpl, ".scheduler.requestAtLeastEverySeconds") + erc.Whenf(ec, c.Scheduler.RequestPort == 0, zeroTmpl, ".scheduler.requestPort") + erc.Whenf(ec, c.Billing != nil && c.Billing.URL == "", emptyTmpl, ".billing.url") + erc.Whenf(ec, c.Billing != nil && c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName") + erc.Whenf(ec, c.Billing != nil && c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName") + erc.Whenf(ec, c.Billing != nil && c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds") + erc.Whenf(ec, c.Billing != nil && c.Billing.PushEverySeconds == 0, zeroTmpl, ".billing.pushEverySeconds") + erc.Whenf(ec, c.Billing != nil && c.Billing.PushTimeoutSeconds == 0, zeroTmpl, ".billing.pushTimeoutSeconds") return ec.Resolve() } diff --git a/pkg/agent/core/action.go b/pkg/agent/core/action.go new file mode 100644 index 000000000..990064d62 --- /dev/null +++ b/pkg/agent/core/action.go @@ -0,0 +1,40 @@ +package core + +import ( + "time" + + "github.com/neondatabase/autoscaling/pkg/api" +) + +type ActionSet struct { + Wait *ActionWait `json:"wait,omitempty"` + PluginRequest *ActionPluginRequest `json:"pluginRequest,omitempty"` + NeonVMRequest *ActionNeonVMRequest `json:"neonvmRequest,omitempty"` + InformantDownscale *ActionInformantDownscale `json:"informantDownscale,omitempty"` + InformantUpscale *ActionInformantUpscale `json:"informantUpscale,omitempty"` +} + +type ActionWait struct { + Duration time.Duration `json:"duration"` +} + +type ActionPluginRequest struct { + LastPermit *api.Resources `json:"current"` + Target api.Resources `json:"target"` + Metrics *api.Metrics `json:"metrics"` +} + +type ActionNeonVMRequest struct { + Current api.Resources `json:"current"` + Target api.Resources `json:"target"` +} + +type ActionInformantDownscale struct { + Current api.Resources `json:"current"` + Target api.Resources `json:"target"` +} + +type ActionInformantUpscale struct { + Current api.Resources `json:"current"` + Target api.Resources `json:"target"` +} diff --git a/pkg/agent/core/dumpstate.go b/pkg/agent/core/dumpstate.go new file mode 100644 index 000000000..b36874a6a --- /dev/null +++ b/pkg/agent/core/dumpstate.go @@ -0,0 +1,142 @@ +package core + +// Implementation of (*UpdateState).Dump() + +import ( + "time" + + "github.com/neondatabase/autoscaling/pkg/api" +) + +func shallowCopy[T any](ptr *T) *T { + if ptr == nil { + return nil + } else { + x := *ptr + return &x + } +} + +// StateDump provides introspection into the current values of the fields of State +type StateDump struct { + Config Config `json:"config"` + VM api.VmInfo `json:"vm"` + Plugin pluginStateDump `json:"plugin"` + Informant informantStateDump `json:"informant"` + NeonVM neonvmStateDump `json:"neonvm"` + Metrics *api.Metrics `json:"metrics"` +} + +// Dump produces a JSON-serializable representation of the State +func (s *State) Dump() StateDump { + return StateDump{ + Config: s.config, + VM: s.vm, + Plugin: s.plugin.dump(), + Informant: s.informant.dump(), + NeonVM: s.neonvm.dump(), + Metrics: shallowCopy(s.metrics), + } +} + +type pluginStateDump struct { + Alive bool `json:"alive"` + OngoingRequest bool `json:"ongoingRequest"` + ComputeUnit *api.Resources `json:"computeUnit"` + LastRequest *pluginRequestedDump `json:"lastRequest"` + Permit *api.Resources `json:"permit"` +} +type pluginRequestedDump struct { + At time.Time `json:"time"` + Resources api.Resources `json:"resources"` +} + +func (s *pluginState) dump() pluginStateDump { + var lastRequest *pluginRequestedDump + if s.lastRequest != nil { + lastRequest = &pluginRequestedDump{ + At: s.lastRequest.at, + Resources: s.lastRequest.resources, + } + } + + return pluginStateDump{ + Alive: s.alive, + OngoingRequest: s.ongoingRequest, + ComputeUnit: shallowCopy(s.computeUnit), + LastRequest: lastRequest, + Permit: shallowCopy(s.permit), + } +} + +type informantStateDump struct { + Active bool `json:"active"` + OngoingRequest *OngoingInformantRequestDump `json:"ongoingRequest"` + RequestedUpscale *requestedUpscaleDump `json:"requestedUpscale"` + DeniedDownscale *deniedDownscaleDump `json:"deniedDownscale"` + Approved *api.Resources `json:"approved"` + DownscaleFailureAt *time.Time `json:"downscaleFailureAt"` + UpscaleFailureAt *time.Time `json:"upscaleFailureAt"` +} +type OngoingInformantRequestDump struct { + Kind informantRequestKind `json:"kind"` +} +type requestedUpscaleDump struct { + At time.Time `json:"at"` + Base api.Resources `json:"base"` + Requested api.MoreResources `json:"requested"` +} +type deniedDownscaleDump struct { + At time.Time `json:"at"` + Requested api.Resources `json:"requested"` +} + +func (s *informantState) dump() informantStateDump { + var requestedUpscale *requestedUpscaleDump + if s.requestedUpscale != nil { + requestedUpscale = &requestedUpscaleDump{ + At: s.requestedUpscale.at, + Base: s.requestedUpscale.base, + Requested: s.requestedUpscale.requested, + } + } + + var deniedDownscale *deniedDownscaleDump + if s.deniedDownscale != nil { + deniedDownscale = &deniedDownscaleDump{ + At: s.deniedDownscale.at, + Requested: s.deniedDownscale.requested, + } + } + + var ongoingRequest *OngoingInformantRequestDump + if s.ongoingRequest != nil { + ongoingRequest = &OngoingInformantRequestDump{ + Kind: s.ongoingRequest.kind, + } + } + + return informantStateDump{ + Active: s.active, + OngoingRequest: ongoingRequest, + RequestedUpscale: requestedUpscale, + DeniedDownscale: deniedDownscale, + Approved: shallowCopy(s.approved), + DownscaleFailureAt: shallowCopy(s.downscaleFailureAt), + UpscaleFailureAt: shallowCopy(s.upscaleFailureAt), + } +} + +type neonvmStateDump struct { + LastSuccess *api.Resources `json:"lastSuccess"` + OngoingRequested *api.Resources `json:"ongoingRequested"` + RequestFailedAt *time.Time `json:"requestFailedAt"` +} + +func (s *neonvmState) dump() neonvmStateDump { + return neonvmStateDump{ + LastSuccess: shallowCopy(s.lastSuccess), + OngoingRequested: shallowCopy(s.ongoingRequested), + RequestFailedAt: shallowCopy(s.requestFailedAt), + } +} diff --git a/pkg/agent/core/state.go b/pkg/agent/core/state.go new file mode 100644 index 000000000..287a92a48 --- /dev/null +++ b/pkg/agent/core/state.go @@ -0,0 +1,710 @@ +package core + +// The core scaling logic at the heart of the autoscaler-agent. This file implements everything with +// mostly pure-ish functions, so that all the making & receiving requests can be done elsewhere. +// +// Broadly our strategy is to mimic the kind of eventual consistency that is itself used in +// Kubernetes. The scaling logic wasn't always implemented like this, but because the +// autoscaler-agent *fundamentally* exists in an eventual consistency world, we have to either: +// (a) make assumptions that we know are false; or +// (b) design our system so it assumes less. +// We used to solve this by (a). We ran into¹ issues² going that way, because sometimes those false +// assumptions come back to haunt you. +// +// That said, there's still some tricky semantics we want to maintain. Internally, the +// autoscaler-agent must be designed around eventual consistency, but the API we expose to the +// vm-informant is strictly synchonous. As such, there's some subtle logic to make sure that we're +// not violating our own guarantees. +// +// --- +// ¹ https://github.com/neondatabase/autoscaling/issues/23 +// ² https://github.com/neondatabase/autoscaling/issues/350 + +import ( + "fmt" + "math" + "strings" + "time" + + "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" +) + +// Config represents some of the static configuration underlying the decision-making of State +type Config struct { + // DefaultScalingConfig is just copied from the global autoscaler-agent config. + // If the VM's ScalingConfig is nil, we use this field instead. + DefaultScalingConfig api.ScalingConfig + + // PluginRequestTick gives the period at which we should be making requests to the scheduler + // plugin, even if nothing's changed. + PluginRequestTick time.Duration + + // InformantDeniedDownscaleCooldown gives the time we must wait between making duplicate + // downscale requests to the vm-informant where the previous failed. + InformantDeniedDownscaleCooldown time.Duration + + // InformantRetryWait gives the amount of time to wait to retry after a *failed* request. + InformantRetryWait time.Duration + + // Warn provides an outlet for (*State).Next() to give warnings about conditions that are + // impeding its ability to execute. (e.g. "wanted to do X but couldn't because of Y") + Warn func(string, ...any) `json:"-"` +} + +// State holds all of the necessary internal state for a VM in order to make scaling +// decisions +type State struct { + // ANY CHANGED FIELDS MUST BE UPDATED IN dump.go AS WELL + + config Config + + // vm gives the current state of the VM - or at least, the state of the fields we care about. + // + // NB: any contents behind pointers in vm are immutable. Any time the field is updated, we + // replace it with a fresh object. + vm api.VmInfo + + // plugin records all state relevant to communications with the scheduler plugin + plugin pluginState + + // informant records all state relevant to communications with the vm-informant + informant informantState + + // neonvm records all state relevant to the NeonVM k8s API + neonvm neonvmState + + metrics *api.Metrics +} + +type pluginState struct { + alive bool + // ongoingRequest is true iff there is currently an ongoing request to *this* scheduler plugin. + ongoingRequest bool + // computeUnit, if not nil, gives the value of the compute unit we most recently got from a + // PluginResponse + computeUnit *api.Resources + // lastRequest, if not nil, gives information about the most recently started request to the + // plugin (maybe unfinished!) + lastRequest *pluginRequested + // permit, if not nil, stores the Permit in the most recent PluginResponse. This field will be + // nil if we have not been able to contact *any* scheduler. If we switch schedulers, we trust + // the old one. + permit *api.Resources +} + +type pluginRequested struct { + at time.Time + resources api.Resources +} + +type informantState struct { + // active is true iff the agent is currently "confirmed" and not "suspended" by the informant. + // Otherwise, we shouldn't be making any kind of scaling requests. + active bool + + ongoingRequest *ongoingInformantRequest + + // requestedUpscale, if not nil, stores the most recent *unresolved* upscaling requested by the + // vm-informant, along with the time at which it occurred. + requestedUpscale *requestedUpscale + + // deniedDownscale, if not nil, stores the result of the lastest denied /downscale request. + deniedDownscale *deniedDownscale + + // approved stores the most recent Resources associated with either (a) an accepted downscale + // request, or (b) a successful upscale notification. + approved *api.Resources + + downscaleFailureAt *time.Time + upscaleFailureAt *time.Time +} + +type ongoingInformantRequest struct { + kind informantRequestKind +} + +type informantRequestKind string + +const ( + informantRequestKindDownscale informantRequestKind = "downscale" + informantRequestKindUpscale informantRequestKind = "upscale" +) + +type requestedUpscale struct { + at time.Time + base api.Resources + requested api.MoreResources +} + +type deniedDownscale struct { + at time.Time + requested api.Resources +} + +type neonvmState struct { + lastSuccess *api.Resources + // ongoingRequested, if not nil, gives the resources requested + ongoingRequested *api.Resources + requestFailedAt *time.Time +} + +func NewState(vm api.VmInfo, config Config) *State { + return &State{ + config: config, + vm: vm, + plugin: pluginState{ + alive: false, + ongoingRequest: false, + computeUnit: nil, + lastRequest: nil, + permit: nil, + }, + informant: informantState{ + active: false, + ongoingRequest: nil, + requestedUpscale: nil, + deniedDownscale: nil, + approved: nil, + downscaleFailureAt: nil, + upscaleFailureAt: nil, + }, + neonvm: neonvmState{ + lastSuccess: nil, + ongoingRequested: nil, + requestFailedAt: nil, + }, + metrics: nil, + } +} + +// NextActions is used to implement the state machine. It's a pure function that *just* indicates +// what the executor should do. +func (s *State) NextActions(now time.Time) ActionSet { + var actions ActionSet + + using := s.vm.Using() + + var desiredResources api.Resources + + if s.informant.active { + desiredResources = s.desiredResourcesFromMetricsOrRequestedUpscaling() + } else { + // If we're not deemed "active" by the informant, then we shouldn't be making any kind of + // scaling requests on its behalf. + // + // We'll still talk to the scheduler to inform it about the current resource usage though, + // to mitigate any reliability issues - much of the informant is built (as of 2023-07-09) + // under the assumption that we could, in theory, have multiple autoscaler-agents on the + // same node at the same time. That's... not really true, so an informant that isn't + // "active" is more likely to just be crash-looping due to a bug. + // + // *In theory* if we had mutliple autoscaler-agents talking to a single informant, this + // would be incorrect; we'd override another one's scaling requests. But this should be + // fine. + desiredResources = using + } + + desiredResourcesApprovedByInformant := s.boundResourcesByInformantApproved(desiredResources) + desiredResourcesApprovedByPlugin := s.boundResourcesByPluginApproved(desiredResources) + // NB: informant approved provides a lower bound + approvedDesiredResources := desiredResourcesApprovedByPlugin.Max(desiredResourcesApprovedByInformant) + + ongoingNeonVMRequest := s.neonvm.ongoingRequested != nil + + var requestForPlugin api.Resources + if s.plugin.permit == nil { + // If we haven't yet gotten a proper plugin response, then we aren't allowed to ask for + // anything beyond our current usage. + requestForPlugin = using + } else { + // ... Otherwise, we should: + // 1. "inform" the plugin of any downscaling since the previous permit + // 2. "request" any desired upscaling relative to to the previous permit + // with (2) taking priority over (1), if there's any conflicts. + requestForPlugin = desiredResources.Max(using) // ignore "desired" downscaling with .Max(using) + } + + // We want to make a request to the scheduler plugin if: + // 1. we've waited long enough since the previous request; or + // 2.a. we want to request resources / inform it of downscale; and + // b. there isn't any ongoing, conflicting request + timeForNewPluginRequest := s.plugin.lastRequest == nil || now.Sub(s.plugin.lastRequest.at) >= s.config.PluginRequestTick + shouldUpdatePlugin := s.plugin.lastRequest != nil && + // "we haven't tried requesting *these* resources from it yet, or we can retry requesting" + (s.plugin.lastRequest.resources != requestForPlugin || timeForNewPluginRequest) && + !ongoingNeonVMRequest + + if !s.plugin.ongoingRequest && (timeForNewPluginRequest || shouldUpdatePlugin) && s.plugin.alive { + if !shouldUpdatePlugin { + // If we shouldn't "update" the plugin, then just inform it about the current resources + // and metrics. + actions.PluginRequest = &ActionPluginRequest{ + LastPermit: s.plugin.permit, + Target: using, + Metrics: s.metrics, + } + } else { + // ... Otherwise, we should try requesting something new form it. + actions.PluginRequest = &ActionPluginRequest{ + LastPermit: s.plugin.permit, + Target: desiredResourcesApprovedByInformant, + Metrics: s.metrics, + } + } + } else if timeForNewPluginRequest || shouldUpdatePlugin { + if s.plugin.alive { + s.config.Warn("Wanted to make a request to the plugin, but there's already one ongoing") + } else { + s.config.Warn("Wanted to make a request to the plugin, but there isn't one active right now") + } + } + + // We want to make a request to NeonVM if we've been approved for a change in resources that + // we're not currently using. + if approvedDesiredResources != using { + // ... but we can't make one if there's already a request ongoing, either via the NeonVM API + // or to the scheduler plugin, because they require taking out the request lock. + if !ongoingNeonVMRequest && !s.plugin.ongoingRequest { + actions.NeonVMRequest = &ActionNeonVMRequest{ + Current: using, + Target: approvedDesiredResources, + } + } else { + var reqs []string + if s.plugin.ongoingRequest { + reqs = append(reqs, "plugin request") + } + if ongoingNeonVMRequest && *s.neonvm.ongoingRequested != approvedDesiredResources { + reqs = append(reqs, "NeonVM request (for different resources)") + } + + if len(reqs) != 0 { + s.config.Warn("Wanted to make a request to NeonVM API, but there's already %s ongoing", strings.Join(reqs, " and ")) + } + } + } + + // We should make an upscale request to the informant if we've upscaled and the informant + // doesn't know about it. + wantInformantUpscaleRequest := s.informant.approved != nil && *s.informant.approved != desiredResources.Max(*s.informant.approved) + // However, we may need to wait before retrying (or for any ongoing requests to finish) + makeInformantUpscaleRequest := wantInformantUpscaleRequest && + s.informant.active && + s.informant.ongoingRequest == nil && + (s.informant.upscaleFailureAt == nil || + now.Sub(*s.informant.upscaleFailureAt) >= s.config.InformantRetryWait) + if wantInformantUpscaleRequest { + if makeInformantUpscaleRequest { + actions.InformantUpscale = &ActionInformantUpscale{ + Current: *s.informant.approved, + Target: desiredResources.Max(*s.informant.approved), + } + } else if !s.informant.active { + s.config.Warn("Wanted to send informant upscale request, but not active") + } else if s.informant.ongoingRequest != nil && s.informant.ongoingRequest.kind != informantRequestKindUpscale { + s.config.Warn("Wanted to send informant upscale request, but waiting other ongoing %s request", s.informant.ongoingRequest.kind) + } else if s.informant.ongoingRequest == nil { + s.config.Warn("Wanted to send informant upscale request, but waiting on retry rate limit") + } + } + + // We should make a downscale request to the informant if we want to downscale but haven't been + // approved for it. + var resourcesForInformantDownscale api.Resources + if s.informant.approved != nil { + resourcesForInformantDownscale = desiredResources.Min(*s.informant.approved) + } else { + resourcesForInformantDownscale = desiredResources.Min(using) + } + wantInformantDownscaleRequest := s.informant.approved != nil && *s.informant.approved != resourcesForInformantDownscale + if s.informant.approved == nil && resourcesForInformantDownscale != using { + s.config.Warn("Wanted to send informant downscale request, but haven't yet gotten information about its resources") + } + // However, we may need to wait before retrying (or for any ongoing requests to finish) + makeInformantDownscaleRequest := wantInformantDownscaleRequest && + s.informant.active && + s.informant.ongoingRequest == nil && + (s.informant.deniedDownscale == nil || + s.informant.deniedDownscale.requested != desiredResources.Min(using) || + now.Sub(s.informant.deniedDownscale.at) >= s.config.InformantDeniedDownscaleCooldown) && + (s.informant.downscaleFailureAt == nil || + now.Sub(*s.informant.downscaleFailureAt) >= s.config.InformantRetryWait) + + if wantInformantDownscaleRequest { + if makeInformantDownscaleRequest { + actions.InformantDownscale = &ActionInformantDownscale{ + Current: *s.informant.approved, + Target: resourcesForInformantDownscale, + } + } else if !s.informant.active { + s.config.Warn("Wanted to send informant downscale request, but not active") + } else if s.informant.ongoingRequest != nil && s.informant.ongoingRequest.kind != informantRequestKindDownscale { + s.config.Warn("Wanted to send informant downscale request, but waiting on other ongoing %s request", s.informant.ongoingRequest.kind) + } else if s.informant.ongoingRequest == nil { + s.config.Warn("Wanted to send informant downscale request, but waiting on retry rate limit") + } + } + + // --- and that's all the request types! --- + + // If there's anything waiting, we should also note how long we should wait for. + // There's two components we could be waiting on: the scheduler plugin, and the vm-informant. + maximumDuration := time.Duration(int64(uint64(1)<<63 - 1)) + requiredWait := maximumDuration + + // We always need to periodically send messages to the plugin. If actions.PluginRequest == nil, + // we know that either: + // + // (a) s.plugin.lastRequestAt != nil (otherwise timeForNewPluginRequest == true); or + // (b) s.plugin.ongoingRequest == true (the only reason why we wouldn't've exited earlier) + // + // So we actually only need to explicitly wait if there's not an ongoing request - otherwise + // we'll be notified anyways when the request is done. + if actions.PluginRequest == nil && s.plugin.alive && !s.plugin.ongoingRequest { + requiredWait = util.Min(requiredWait, now.Sub(s.plugin.lastRequest.at)) + } + + // For the vm-informant: + // if we wanted to make EITHER a downscale or upscale request, but we previously couldn't + // because of retry timeouts, we should wait for s.config.InformantRetryWait before trying + // again. + // OR if we wanted to downscale but got denied, we should wait for + // s.config.InformantDownscaleCooldown before retrying. + if s.informant.ongoingRequest == nil { + // Retry upscale on failure + if wantInformantUpscaleRequest && s.informant.upscaleFailureAt != nil { + if wait := now.Sub(*s.informant.upscaleFailureAt); wait >= s.config.InformantRetryWait { + requiredWait = util.Min(requiredWait, wait) + } + } + // Retry downscale on failure + if wantInformantDownscaleRequest && s.informant.downscaleFailureAt != nil { + if wait := now.Sub(*s.informant.downscaleFailureAt); wait >= s.config.InformantRetryWait { + requiredWait = util.Min(requiredWait, wait) + } + } + // Retry downscale if denied + if wantInformantDownscaleRequest && s.informant.deniedDownscale != nil && resourcesForInformantDownscale == s.informant.deniedDownscale.requested { + if wait := now.Sub(s.informant.deniedDownscale.at); wait >= s.config.InformantDeniedDownscaleCooldown { + requiredWait = util.Min(requiredWait, wait) + } + } + } + + // If we're waiting on anything, add the action. + if requiredWait != maximumDuration { + actions.Wait = &ActionWait{Duration: requiredWait} + } + + return actions +} + +func (s *State) scalingConfig() api.ScalingConfig { + if s.vm.ScalingConfig != nil { + return *s.vm.ScalingConfig + } else { + return s.config.DefaultScalingConfig + } +} + +func (s *State) desiredResourcesFromMetricsOrRequestedUpscaling() api.Resources { + // There's some annoying edge cases that this function has to be able to handle properly. For + // the sake of completeness, they are: + // + // 1. s.vm.Using() is not a multiple of s.computeUnit + // 2. s.vm.Max() is less than s.computeUnit (or: has at least one resource that is) + // 3. s.vm.Using() is a fractional multiple of s.computeUnit, but !allowDecrease and rounding up + // is greater than s.vm.Max() + // 4. s.vm.Using() is much larger than s.vm.Min() and not a multiple of s.computeUnit, but load + // is low so we should just decrease *anyways*. + // + // --- + // + // Broadly, the implementation works like this: + // 1. Based on load average, calculate the "goal" number of CPUs (and therefore compute units) + // 2. Cap the goal CU by min/max, etc + // 3. that's it! + + // If we don't know + if s.plugin.computeUnit == nil { + return s.vm.Using() + } + + var goalCU uint32 + if s.metrics != nil { + // Goal compute unit is at the point where (CPUs) × (LoadAverageFractionTarget) == (load + // average), + // which we can get by dividing LA by LAFT. + goalCU = uint32(math.Round(float64(s.metrics.LoadAverage1Min) / s.scalingConfig().LoadAverageFractionTarget)) + } + + // Update goalCU based on any requested upscaling + goalCU = util.Max(goalCU, s.requiredCUForRequestedUpscaling(*s.plugin.computeUnit)) + + // resources for the desired "goal" compute units + var goalResources api.Resources + + // If there's no constraints from s.metrics or s.informant.requestedUpscale, then we'd prefer to + // keep things as-is, rather than scaling down (because otherwise goalCU = 0). + if s.metrics == nil && s.informant.requestedUpscale == nil { + goalResources = s.vm.Using() + } else { + goalResources = s.plugin.computeUnit.Mul(uint16(goalCU)) + } + + // bound goal by the minimum and maximum resource amounts for the VM + result := goalResources.Min(s.vm.Max()).Max(s.vm.Min()) + + // Check that the result is sound. + // + // With the current (naive) implementation, this is trivially ok. In future versions, it might + // not be so simple, so it's good to have this integrity check here. + if result.HasFieldGreaterThan(s.vm.Max()) { + panic(fmt.Errorf( + "produced invalid desiredVMState: result has field greater than max. this = %+v", s, + )) + } else if result.HasFieldLessThan(s.vm.Min()) { + panic(fmt.Errorf( + "produced invalid desiredVMState: result has field less than min. this = %+v", s, + )) + } + + return result +} + +// NB: we could just use s.plugin.computeUnit, but that's sometimes nil. This way, it's clear that +// it's the caller's responsibility to ensure that s.plugin.computeUnit != nil. +func (s *State) requiredCUForRequestedUpscaling(computeUnit api.Resources) uint32 { + if s.informant.requestedUpscale == nil { + return 0 + } + + var required uint32 + requested := s.informant.requestedUpscale.requested + + // note: floor(x / M) + 1 gives the minimum integer value greater than x / M. + + if requested.Cpu { + required = util.Max(required, uint32(s.vm.Cpu.Use/computeUnit.VCPU)+1) + } + if requested.Memory { + required = util.Max(required, uint32(s.vm.Mem.Use/computeUnit.Mem)+1) + } + + return required +} + +func (s *State) boundResourcesByInformantApproved(resources api.Resources) api.Resources { + var lowerBound api.Resources + if s.informant.approved != nil { + lowerBound = *s.informant.approved + } else { + lowerBound = s.vm.Using() + } + return resources.Max(lowerBound) +} + +func (s *State) boundResourcesByPluginApproved(resources api.Resources) api.Resources { + var upperBound api.Resources + if s.plugin.permit != nil { + upperBound = *s.plugin.permit + } else { + upperBound = s.vm.Using() + } + return resources.Min(upperBound) +} + +////////////////////////////////////////// +// PUBLIC FUNCTIONS TO UPDATE THE STATE // +////////////////////////////////////////// + +func (s *State) UpdatedVM(vm api.VmInfo) { + s.vm = vm +} + +func (s *State) UpdateMetrics(metrics api.Metrics) { + s.metrics = &metrics +} + +// PluginHandle provides write access to the scheduler plugin pieces of an UpdateState +type PluginHandle struct { + s *State +} + +func (s *State) Plugin() PluginHandle { + return PluginHandle{s} +} + +func (h PluginHandle) NewScheduler() { + h.s.plugin = pluginState{ + alive: true, + ongoingRequest: false, + computeUnit: nil, + lastRequest: nil, + permit: h.s.plugin.permit, // Keep this; trust the previous scheduler. + } +} + +func (h PluginHandle) SchedulerGone() { + h.s.plugin = pluginState{ + alive: false, + ongoingRequest: false, + computeUnit: nil, + lastRequest: nil, + permit: h.s.plugin.permit, // Keep this; trust the previous scheduler. + } +} + +func (h PluginHandle) StartingRequest(now time.Time, resources api.Resources) { + h.s.plugin.lastRequest = &pluginRequested{ + at: now, + resources: resources, + } + h.s.plugin.ongoingRequest = true +} + +func (h PluginHandle) RequestFailed(now time.Time) { + h.s.plugin.ongoingRequest = false +} + +func (h PluginHandle) RequestSuccessful(now time.Time, resp api.PluginResponse) error { + h.s.plugin.ongoingRequest = false + + if err := resp.Permit.ValidateNonZero(); err != nil { + return fmt.Errorf("Invalid permit: %w", err) + } + if err := resp.ComputeUnit.ValidateNonZero(); err != nil { + return fmt.Errorf("Invalid compute unit: %w", err) + } + + // Errors from resp in connection with the prior request + if resp.Permit.HasFieldGreaterThan(h.s.plugin.lastRequest.resources) { + return fmt.Errorf( + "Permit has resources greater than request (%+v vs. %+v)", + resp.Permit, h.s.plugin.lastRequest.resources, + ) + } + + // Errors from resp in connection with the prior request AND the VM state + if vmUsing := h.s.vm.Using(); resp.Permit.HasFieldLessThan(vmUsing) { + return fmt.Errorf("Permit has resources less than VM (%+v vs %+v)", resp.Permit, vmUsing) + } + + // All good - set everything. + + h.s.plugin.computeUnit = &resp.ComputeUnit + h.s.plugin.permit = &resp.Permit + return nil +} + +// InformantHandle provides write access to the vm-informant pieces of an UpdateState +type InformantHandle struct { + s *State +} + +func (s *State) Informant() InformantHandle { + return InformantHandle{s} +} + +func (h InformantHandle) Reset() { + h.s.informant = informantState{ + active: false, + ongoingRequest: nil, + requestedUpscale: nil, + deniedDownscale: nil, + approved: nil, + downscaleFailureAt: nil, + upscaleFailureAt: nil, + } +} + +func (h InformantHandle) Active(active bool) { + h.s.informant.active = active +} + +func (h InformantHandle) SuccessfullyRegistered() { + using := h.s.vm.Using() + h.s.informant.approved = &using // TODO: this is racy (although... informant synchronization should help *some* with this?) +} + +func (h InformantHandle) UpscaleRequested(now time.Time, resources api.MoreResources) { + h.s.informant.requestedUpscale = &requestedUpscale{ + at: now, + base: h.s.vm.Using(), // TODO: this is racy (maybe the resources were different when the informant originally made the request) + requested: resources, + } +} + +func (h InformantHandle) StartingUpscaleRequest(now time.Time) { + h.s.informant.ongoingRequest = &ongoingInformantRequest{kind: informantRequestKindUpscale} + h.s.informant.upscaleFailureAt = nil +} + +func (h InformantHandle) UpscaleRequestSuccessful(now time.Time, resources api.Resources) { + h.s.informant.ongoingRequest = nil + h.s.informant.approved = &resources +} + +func (h InformantHandle) UpscaleRequestFailed(now time.Time) { + h.s.informant.ongoingRequest = nil + h.s.informant.upscaleFailureAt = &now +} + +func (h InformantHandle) StartingDownscaleRequest(now time.Time) { + h.s.informant.ongoingRequest = &ongoingInformantRequest{kind: informantRequestKindDownscale} + h.s.informant.downscaleFailureAt = nil +} + +func (h InformantHandle) DownscaleRequestAllowed(now time.Time, requested api.Resources) { + h.s.informant.ongoingRequest = nil + h.s.informant.approved = &requested + h.s.informant.deniedDownscale = nil +} + +// Downscale request was successful but the informant denied our request. +func (h InformantHandle) DownscaleRequestDenied(now time.Time, requested api.Resources) { + h.s.informant.ongoingRequest = nil + h.s.informant.deniedDownscale = &deniedDownscale{ + at: now, + requested: requested, + } +} + +func (h InformantHandle) DownscaleRequestFailed(now time.Time) { + h.s.informant.ongoingRequest = nil + h.s.informant.downscaleFailureAt = &now +} + +type NeonVMHandle struct { + s *State +} + +func (s *State) NeonVM() NeonVMHandle { + return NeonVMHandle{s} +} + +func (h NeonVMHandle) StartingRequest(now time.Time, resources api.Resources) { + // FIXME: add time to ongoing request info (or maybe only in RequestFailed?) + h.s.neonvm.ongoingRequested = &resources +} + +func (h NeonVMHandle) RequestSuccessful(now time.Time) { + if h.s.neonvm.ongoingRequested == nil { + panic("received NeonVM().RequestSuccessful() update without ongoing request") + } + + resources := *h.s.neonvm.ongoingRequested + + // FIXME: This is actually incorrect; we shouldn't trust that the VM has already been updated + // just because the request completed. It takes longer for the reconcile cycle(s) to make the + // necessary changes. + h.s.vm.Cpu.Use = resources.VCPU + h.s.vm.Mem.Use = resources.Mem + + h.s.neonvm.ongoingRequested = nil +} + +func (h NeonVMHandle) RequestFailed(now time.Time) { + h.s.neonvm.ongoingRequested = nil +} diff --git a/pkg/agent/execbridge.go b/pkg/agent/execbridge.go new file mode 100644 index 000000000..9eba23565 --- /dev/null +++ b/pkg/agent/execbridge.go @@ -0,0 +1,200 @@ +package agent + +// Implementations of the interfaces used by & defined in pkg/agent/executor +// +// This file is essentially the bridge between 'runner.go' and 'executor/' + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/agent/executor" + "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" +) + +var ( + _ executor.PluginInterface = (*execPluginInterface)(nil) + _ executor.NeonVMInterface = (*execNeonVMInterface)(nil) + _ executor.InformantInterface = (*execInformantInterface)(nil) +) + +///////////////////////////////////////////////////////////// +// Scheduler Plugin -related interfaces and implementation // +///////////////////////////////////////////////////////////// + +type execPluginInterface struct { + runner *Runner + core *executor.ExecutorCore +} + +func makePluginInterface(r *Runner, core *executor.ExecutorCore) *execPluginInterface { + return &execPluginInterface{runner: r, core: core} +} + +// EmptyID implements executor.PluginInterface +func (iface *execPluginInterface) EmptyID() string { + return "" +} + +// RequestLock implements executor.PluginInterface +func (iface *execPluginInterface) RequestLock() util.ChanMutex { + return iface.runner.requestLock +} + +// GetHandle implements executor.PluginInterface +func (iface *execPluginInterface) GetHandle() executor.PluginHandle { + scheduler := iface.runner.scheduler.Load() + + if scheduler == nil { + return nil + } + + return &execPluginHandle{ + runner: iface.runner, + scheduler: scheduler, + } +} + +type execPluginHandle struct { + runner *Runner + scheduler *Scheduler +} + +// ID implements executor.PluginHandle +func (h *execPluginHandle) ID() string { + return string(h.scheduler.info.UID) +} + +// Request implements executor.PluginHandle +func (h *execPluginHandle) Request( + ctx context.Context, + logger *zap.Logger, + lastPermit *api.Resources, + target api.Resources, + metrics *api.Metrics, +) (*api.PluginResponse, error) { + if lastPermit != nil { + h.runner.recordResourceChange(*lastPermit, target, h.runner.global.metrics.schedulerRequestedChange) + } + + resp, err := h.scheduler.DoRequest(ctx, logger, target, metrics) + + if err != nil && lastPermit != nil { + h.runner.recordResourceChange(*lastPermit, target, h.runner.global.metrics.schedulerApprovedChange) + } + + return resp, err +} + +///////////////////////////////////////////////// +// NeonVM-related interface and implementation // +///////////////////////////////////////////////// + +type execNeonVMInterface struct { + runner *Runner +} + +func makeNeonVMInterface(r *Runner) *execNeonVMInterface { + return &execNeonVMInterface{runner: r} +} + +// RequestLock implements executor.NeonVMInterface +func (iface *execNeonVMInterface) RequestLock() util.ChanMutex { + return iface.runner.requestLock +} + +// Request implements executor.NeonVMInterface +func (iface *execNeonVMInterface) Request(ctx context.Context, logger *zap.Logger, current, target api.Resources) error { + iface.runner.recordResourceChange(current, target, iface.runner.global.metrics.neonvmRequestedChange) + + err := iface.runner.doNeonVMRequest(ctx, target) + if err != nil { + return fmt.Errorf("Error making VM patch request: %w", err) + } + + return nil +} + +//////////////////////////////////////////////////// +// Informant-related interface and implementation // +//////////////////////////////////////////////////// + +type execInformantInterface struct { + runner *Runner + core *executor.ExecutorCore +} + +func makeInformantInterface(r *Runner, core *executor.ExecutorCore) *execInformantInterface { + return &execInformantInterface{runner: r, core: core} +} + +// EmptyID implements executor.InformantInterface +func (iface *execInformantInterface) EmptyID() string { + return "" +} + +func (iface *execInformantInterface) GetHandle() executor.InformantHandle { + server := iface.runner.server.Load() + + if server == nil || server.ExitStatus() != nil { + return nil + } + + return &execInformantHandle{server: server} +} + +type execInformantHandle struct { + server *InformantServer +} + +func (h *execInformantHandle) ID() string { + return h.server.desc.AgentID.String() +} + +func (h *execInformantHandle) RequestLock() util.ChanMutex { + return h.server.requestLock +} + +func (h *execInformantHandle) Downscale( + ctx context.Context, + logger *zap.Logger, + current api.Resources, + target api.Resources, +) (*api.DownscaleResult, error) { + // Check validity of the message we're sending + if target.HasFieldGreaterThan(current) { + innerMsg := fmt.Errorf("%+v has field greater than %+v", target, current) + panic(fmt.Errorf("(*execInformantHandle).Downscale() called with target greater than current: %w", innerMsg)) + } + + h.server.runner.recordResourceChange(current, target, h.server.runner.global.metrics.informantRequestedChange) + + result, err := h.server.Downscale(ctx, logger, target) + + if err != nil && result.Ok { + h.server.runner.recordResourceChange(current, target, h.server.runner.global.metrics.informantApprovedChange) + } + + return result, err +} + +func (h *execInformantHandle) Upscale(ctx context.Context, logger *zap.Logger, current, target api.Resources) error { + // Check validity of the message we're sending + if target.HasFieldLessThan(current) { + innerMsg := fmt.Errorf("%+v has field less than %+v", target, current) + panic(fmt.Errorf("(*execInformantHandle).Upscale() called with target less than current: %w", innerMsg)) + } + + h.server.runner.recordResourceChange(current, target, h.server.runner.global.metrics.informantRequestedChange) + + err := h.server.Upscale(ctx, logger, target) + + if err != nil { + h.server.runner.recordResourceChange(current, target, h.server.runner.global.metrics.informantApprovedChange) + } + + return err +} diff --git a/pkg/agent/executor/core.go b/pkg/agent/executor/core.go new file mode 100644 index 000000000..5ae6cfd10 --- /dev/null +++ b/pkg/agent/executor/core.go @@ -0,0 +1,155 @@ +package executor + +// Consumers of pkg/agent/core, implementing the "executors" for each type of action. These are +// wrapped up into a single ExecutorCore type, which exposes some methods for the various executors. +// +// The executors use various abstract interfaces for the scheudler / NeonVM / informant. The +// implementations of those interfaces are defiend in ifaces.go + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/agent/core" + "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" +) + +type Config = core.Config + +type ExecutorCore struct { + mu sync.Mutex + + stateLogger *zap.Logger + + core *core.State + actions *timedActions + + updates *util.Broadcaster +} + +type ClientSet struct { + Plugin PluginInterface + NeonVM NeonVMInterface + Informant InformantInterface +} + +func NewExecutorCore(stateLogger *zap.Logger, vm api.VmInfo, config core.Config) *ExecutorCore { + return &ExecutorCore{ + mu: sync.Mutex{}, + stateLogger: stateLogger, + core: core.NewState(vm, config), + actions: nil, // (*ExecutorCore).getActions() checks if this is nil + updates: util.NewBroadcaster(), + } +} + +type ExecutorCoreWithClients struct { + *ExecutorCore + + clients ClientSet +} + +func (c *ExecutorCore) WithClients(clients ClientSet) ExecutorCoreWithClients { + return ExecutorCoreWithClients{ + ExecutorCore: c, + clients: clients, + } +} + +type timedActions struct { + calculatedAt time.Time + actions core.ActionSet +} + +func (c *ExecutorCore) getActions() timedActions { + c.mu.Lock() + defer c.mu.Unlock() + + if c.actions == nil { + // NOTE: Even though we cache the actions generated using time.Now(), it's *generally* ok. + now := time.Now() + c.stateLogger.Info("Recalculating ActionSet", zap.Time("now", now), zap.Any("state", c.core.Dump())) + c.actions = &timedActions{calculatedAt: now, actions: c.core.NextActions(now)} + } + + return *c.actions +} + +func (c *ExecutorCore) update(with func(*core.State)) { + c.mu.Lock() + defer c.mu.Unlock() + + // NB: We broadcast the update *before* calling with() because this gets us nicer ordering + // guarantees in some cases. + c.updates.Broadcast() + c.actions = nil + with(c.core) +} + +// Updater returns a handle on the object used for making external changes to the ExecutorCore, +// beyond what's provided by the various client (ish) interfaces +func (c *ExecutorCore) Updater() ExecutorCoreUpdater { + return ExecutorCoreUpdater{c} +} + +// ExecutorCoreUpdater provides a common interface for external changes to the ExecutorCore +type ExecutorCoreUpdater struct { + core *ExecutorCore +} + +func (c ExecutorCoreUpdater) UpdateMetrics(metrics api.Metrics, withLock func()) { + c.core.update(func(state *core.State) { + state.UpdateMetrics(metrics) + withLock() + }) +} + +// NewScheduler updates the inner state, calling (*core.State).Plugin().NewScheduler() +func (c ExecutorCoreUpdater) NewScheduler(withLock func()) { + c.core.update(func(state *core.State) { + state.Plugin().NewScheduler() + withLock() + }) +} + +// SchedulerGone updates the inner state, calling (*core.State).Plugin().SchedulerGone() +func (c ExecutorCoreUpdater) SchedulerGone(withLock func()) { + c.core.update(func(state *core.State) { + state.Plugin().SchedulerGone() + withLock() + }) +} + +func (c ExecutorCoreUpdater) ResetInformant(withLock func()) { + c.core.update(func(state *core.State) { + state.Informant().Reset() + withLock() + }) +} + +func (c ExecutorCoreUpdater) UpscaleRequested(resources api.MoreResources, withLock func()) { + c.core.update(func(state *core.State) { + state.Informant().UpscaleRequested(time.Now(), resources) + withLock() + }) +} + +func (c ExecutorCoreUpdater) InformantRegistered(active bool, withLock func()) { + c.core.update(func(state *core.State) { + state.Informant().SuccessfullyRegistered() + if active { + state.Informant().Active(active) + } + withLock() + }) +} + +func (c ExecutorCoreUpdater) InformantActive(active bool, withLock func()) { + c.core.update(func(state *core.State) { + state.Informant().Active(active) + withLock() + }) +} diff --git a/pkg/agent/executor/exec_informant.go b/pkg/agent/executor/exec_informant.go new file mode 100644 index 000000000..6f758d079 --- /dev/null +++ b/pkg/agent/executor/exec_informant.go @@ -0,0 +1,266 @@ +package executor + +import ( + "context" + "errors" + "time" + + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/agent/core" + "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" +) + +type InformantInterface interface { + EmptyID() string + GetHandle() InformantHandle +} + +type InformantHandle interface { + ID() string + RequestLock() util.ChanMutex + Downscale(_ context.Context, _ *zap.Logger, current, target api.Resources) (*api.DownscaleResult, error) + Upscale(_ context.Context, _ *zap.Logger, current, target api.Resources) error +} + +func (c *ExecutorCoreWithClients) DoInformantDownscales(ctx context.Context, logger *zap.Logger) { + var ( + updates util.BroadcastReceiver = c.updates.NewReceiver() + requestLock util.ChanMutex = util.NewChanMutex() + ifaceLogger *zap.Logger = logger.Named("client") + ) + + holdingRequestLock := false + releaseRequestLockIfHolding := func() { + if holdingRequestLock { + requestLock.Unlock() + holdingRequestLock = false + } + } + defer releaseRequestLockIfHolding() + + // meant to be called while holding c's lock + idUnchanged := func(current string) bool { + if h := c.clients.Informant.GetHandle(); h != nil { + return current == h.ID() + } else { + return current == c.clients.Informant.EmptyID() + } + } + + last := c.getActions() + for { + releaseRequestLockIfHolding() + + // Always receive an update if there is one. This helps with reliability (better guarantees + // about not missing updates) and means that the switch statements can be simpler. + select { + case <-updates.Wait(): + updates.Awake() + last = c.getActions() + default: + } + + // Wait until we're supposed to make a request. + if last.actions.InformantDownscale == nil { + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + } + } + + action := *last.actions.InformantDownscale + + informant := c.clients.Informant.GetHandle() + + if informant != nil { + requestLock = informant.RequestLock() + + // Try to acquire the request lock, but if something happens while we're waiting, we'll + // abort & retry on the next loop iteration (or maybe not, if last.actions changed). + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + case <-requestLock.WaitLock(): + holdingRequestLock = true + } + } + + var startTime time.Time + c.update(func(state *core.State) { + logger.Info("Starting informant downscale request", zap.Any("action", action)) + startTime = time.Now() + state.Informant().StartingDownscaleRequest(startTime) + }) + + result, err := doSingleInformantDownscaleRequest(ctx, ifaceLogger, informant, action) + endTime := time.Now() + + c.update(func(state *core.State) { + unchanged := idUnchanged(informant.ID()) + logFields := []zap.Field{ + zap.Any("action", action), + zap.Duration("duration", endTime.Sub(startTime)), + zap.Bool("unchanged", unchanged), + } + + if err != nil { + logger.Error("Informant downscale request failed", append(logFields, zap.Error(err))...) + if unchanged { + state.Informant().DownscaleRequestFailed(endTime) + } + return + } + + logFields = append(logFields, zap.Any("response", result)) + + if !result.Ok { + logger.Warn("Informant denied downscale", logFields...) + if unchanged { + state.Informant().DownscaleRequestDenied(endTime, action.Target) + } + } else { + logger.Info("Informant approved downscale", logFields...) + if unchanged { + state.Informant().DownscaleRequestAllowed(endTime, action.Target) + } + } + }) + } +} + +func doSingleInformantDownscaleRequest( + ctx context.Context, + logger *zap.Logger, + iface InformantHandle, + action core.ActionInformantDownscale, +) (*api.DownscaleResult, error) { + if iface == nil { + return nil, errors.New("No currently active informant") + } + + return iface.Downscale(ctx, logger, action.Current, action.Target) +} + +func (c *ExecutorCoreWithClients) DoInformantUpscales(ctx context.Context, logger *zap.Logger) { + var ( + updates util.BroadcastReceiver = c.updates.NewReceiver() + requestLock util.ChanMutex = util.NewChanMutex() + ifaceLogger *zap.Logger = logger.Named("client") + ) + + holdingRequestLock := false + releaseRequestLockIfHolding := func() { + if holdingRequestLock { + requestLock.Unlock() + holdingRequestLock = false + } + } + defer releaseRequestLockIfHolding() + + // meant to be called while holding c's lock + idUnchanged := func(current string) bool { + if h := c.clients.Informant.GetHandle(); h != nil { + return current == h.ID() + } else { + return current == c.clients.Informant.EmptyID() + } + } + + last := c.getActions() + for { + releaseRequestLockIfHolding() + + // Always receive an update if there is one. This helps with reliability (better guarantees + // about not missing updates) and means that the switch statements can be simpler. + select { + case <-updates.Wait(): + updates.Awake() + last = c.getActions() + default: + } + + // Wait until we're supposed to make a request. + if last.actions.InformantUpscale == nil { + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + } + } + + action := *last.actions.InformantUpscale + + informant := c.clients.Informant.GetHandle() + + if informant != nil { + requestLock = informant.RequestLock() + + // Try to acquire the request lock, but if something happens while we're waiting, we'll + // abort & retry on the next loop iteration (or maybe not, if last.actions changed). + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + case <-requestLock.WaitLock(): + holdingRequestLock = true + } + } + + var startTime time.Time + c.update(func(state *core.State) { + logger.Info("Starting informant upscale request", zap.Any("action", action)) + startTime = time.Now() + state.Informant().StartingUpscaleRequest(startTime) + }) + + err := doSingleInformantUpscaleRequest(ctx, ifaceLogger, informant, action) + endTime := time.Now() + + c.update(func(state *core.State) { + unchanged := idUnchanged(informant.ID()) + logFields := []zap.Field{ + zap.Any("action", action), + zap.Duration("duration", endTime.Sub(startTime)), + zap.Bool("unchanged", unchanged), + } + + if err != nil { + logger.Error("Informant upscale request failed", append(logFields, zap.Error(err))...) + if unchanged { + state.Informant().UpscaleRequestFailed(endTime) + } + return + } + + logger.Info("Informant upscale request successful", logFields...) + if unchanged { + state.Informant().UpscaleRequestSuccessful(endTime, action.Target) + } + }) + } +} + +func doSingleInformantUpscaleRequest( + ctx context.Context, + logger *zap.Logger, + iface InformantHandle, + action core.ActionInformantUpscale, +) error { + if iface == nil { + return errors.New("No currently active informant") + } + + return iface.Upscale(ctx, logger, action.Current, action.Target) +} diff --git a/pkg/agent/executor/exec_neonvm.go b/pkg/agent/executor/exec_neonvm.go new file mode 100644 index 000000000..62f89d988 --- /dev/null +++ b/pkg/agent/executor/exec_neonvm.go @@ -0,0 +1,94 @@ +package executor + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/agent/core" + "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" +) + +type NeonVMInterface interface { + RequestLock() util.ChanMutex + Request(_ context.Context, _ *zap.Logger, current, target api.Resources) error +} + +func (c *ExecutorCoreWithClients) DoNeonVMRequests(ctx context.Context, logger *zap.Logger) { + var ( + updates util.BroadcastReceiver = c.updates.NewReceiver() + requestLock util.ChanMutex = c.clients.NeonVM.RequestLock() + ifaceLogger *zap.Logger = logger.Named("client") + ) + + holdingRequestLock := false + releaseRequestLockIfHolding := func() { + if holdingRequestLock { + requestLock.Unlock() + holdingRequestLock = false + } + } + defer releaseRequestLockIfHolding() + + last := c.getActions() + for { + releaseRequestLockIfHolding() + + // Always receive an update if there is one. This helps with reliability (better guarantees + // about not missing updates) and means that the switch statements can be simpler. + select { + case <-updates.Wait(): + updates.Awake() + last = c.getActions() + default: + } + + // Wait until we're supposed to make a request. + if last.actions.NeonVMRequest == nil { + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + } + } + + action := *last.actions.NeonVMRequest + + // Try to acquire the request lock, but if something happens while we're waiting, we'll + // abort & retry on the next loop iteration (or maybe not, if last.actions changed). + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + case <-requestLock.WaitLock(): + holdingRequestLock = true + } + + var startTime time.Time + c.update(func(state *core.State) { + logger.Info("Starting NeonVM request", zap.Any("action", action)) + startTime = time.Now() + state.NeonVM().StartingRequest(startTime, action.Target) + }) + + err := c.clients.NeonVM.Request(ctx, ifaceLogger, action.Current, action.Target) + endTime := time.Now() + logFields := []zap.Field{zap.Any("action", action), zap.Duration("duration", endTime.Sub(startTime))} + + c.update(func(state *core.State) { + if err != nil { + logger.Error("NeonVM request failed", append(logFields, zap.Error(err))...) + state.NeonVM().RequestFailed(endTime) + } else /* err == nil */ { + logger.Info("NeonVM request successful", logFields...) + state.NeonVM().RequestSuccessful(endTime) + } + }) + } +} diff --git a/pkg/agent/executor/exec_plugin.go b/pkg/agent/executor/exec_plugin.go new file mode 100644 index 000000000..bcf5b7670 --- /dev/null +++ b/pkg/agent/executor/exec_plugin.go @@ -0,0 +1,138 @@ +package executor + +import ( + "context" + "errors" + "time" + + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/agent/core" + "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" +) + +type PluginInterface interface { + EmptyID() string + RequestLock() util.ChanMutex + GetHandle() PluginHandle +} + +type PluginHandle interface { + ID() string + Request(_ context.Context, _ *zap.Logger, lastPermit *api.Resources, target api.Resources, _ *api.Metrics) (*api.PluginResponse, error) +} + +func (c *ExecutorCoreWithClients) DoPluginRequests(ctx context.Context, logger *zap.Logger) { + var ( + updates util.BroadcastReceiver = c.updates.NewReceiver() + requestLock util.ChanMutex = c.clients.Plugin.RequestLock() + ifaceLogger *zap.Logger = logger.Named("client") + ) + + holdingRequestLock := false + releaseRequestLockIfHolding := func() { + if holdingRequestLock { + requestLock.Unlock() + holdingRequestLock = false + } + } + defer releaseRequestLockIfHolding() + + idUnchanged := func(current string) bool { + if h := c.clients.Plugin.GetHandle(); h != nil { + return current == h.ID() + } else { + return current == c.clients.Plugin.EmptyID() + } + } + + last := c.getActions() + for { + releaseRequestLockIfHolding() + + // Always receive an update if there is one. This helps with reliability (better guarantees + // about not missing updates) and means that the switch statements can be simpler. + select { + case <-updates.Wait(): + updates.Awake() + last = c.getActions() + default: + } + + // Wait until we're supposed to make a request. + if last.actions.PluginRequest == nil { + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + } + } + + action := *last.actions.PluginRequest + + pluginIface := c.clients.Plugin.GetHandle() + + // Try to acquire the request lock, but if something happens while we're waiting, we'll + // abort & retry on the next loop iteration (or maybe not, if last.actions changed). + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + // NB: don't .Awake(); allow that to be handled at the top of the loop. + continue + case <-requestLock.WaitLock(): + holdingRequestLock = true + } + + // update the state to indicate that the request is starting. + var startTime time.Time + c.update(func(state *core.State) { + logger.Info("Starting plugin request", zap.Any("action", action)) + startTime = time.Now() + state.Plugin().StartingRequest(startTime, action.Target) + }) + + resp, err := doSinglePluginRequest(ctx, ifaceLogger, pluginIface, action) + endTime := time.Now() + + c.update(func(state *core.State) { + unchanged := idUnchanged(pluginIface.ID()) + logFields := []zap.Field{ + zap.Any("action", action), + zap.Duration("duration", endTime.Sub(startTime)), + zap.Bool("unchanged", unchanged), + } + + if err != nil { + logger.Error("Plugin request failed", append(logFields, zap.Error(err))...) + if unchanged { + state.Plugin().RequestFailed(endTime) + } + } else { + logFields = append(logFields, zap.Any("response", resp)) + logger.Info("Plugin request successful", logFields...) + if unchanged { + if err := state.Plugin().RequestSuccessful(endTime, *resp); err != nil { + logger.Error("Plugin response validation failed", append(logFields, zap.Error(err))...) + } + } + } + }) + } +} + +func doSinglePluginRequest( + ctx context.Context, + logger *zap.Logger, + iface PluginHandle, + action core.ActionPluginRequest, +) (*api.PluginResponse, error) { + if iface == nil { + return nil, errors.New("No currently enabled plugin handle") + } + + return iface.Request(ctx, logger, action.LastPermit, action.Target, action.Metrics) +} diff --git a/pkg/agent/executor/exec_sleeper.go b/pkg/agent/executor/exec_sleeper.go new file mode 100644 index 000000000..4208491df --- /dev/null +++ b/pkg/agent/executor/exec_sleeper.go @@ -0,0 +1,68 @@ +package executor + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/agent/core" +) + +func (c *ExecutorCore) DoSleeper(ctx context.Context, logger *zap.Logger) { + updates := c.updates.NewReceiver() + + // preallocate the timer. We clear it at the top of the loop; the 0 duration is just because we + // need *some* value, so it might as well be zero. + timer := time.NewTimer(0) + defer timer.Stop() + + last := c.getActions() + for { + // Ensure the timer is cleared at the top of the loop + if !timer.Stop() { + <-timer.C + } + + // If NOT waiting for a particular duration: + if last.actions.Wait == nil { + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + updates.Awake() + last = c.getActions() + } + } + + // If YES waiting for a particular duration + if last.actions.Wait != nil { + // NB: It's possible for last.calculatedAt to be somewhat out of date. It's *probably* + // fine, because we'll be given a notification any time the state has changed, so we + // should wake from a select soon enough to get here + timer.Reset(last.actions.Wait.Duration) + + select { + case <-ctx.Done(): + return + case <-updates.Wait(): + updates.Awake() + + last = c.getActions() + case <-timer.C: + select { + // If there's also an update, then let that take preference: + case <-updates.Wait(): + updates.Awake() + last = c.getActions() + // Otherwise, trigger cache invalidation because we've waited for the requested + // amount of time: + default: + c.update(func(*core.State) {}) + updates.Awake() + last = c.getActions() + } + } + } + } +} diff --git a/pkg/agent/globalstate.go b/pkg/agent/globalstate.go index 41381992b..db877e634 100644 --- a/pkg/agent/globalstate.go +++ b/pkg/agent/globalstate.go @@ -329,17 +329,16 @@ func (s *agentState) newRunner(vmInfo api.VmInfo, podName util.NamespacedName, p status: nil, // set by calller schedulerRespondedWithMigration: false, - shutdown: nil, // set by (*Runner).Run - vm: vmInfo, - podName: podName, - podIP: podIP, - lock: util.NewChanMutex(), - requestLock: util.NewChanMutex(), - requestedUpscale: api.MoreResources{Cpu: false, Memory: false}, + shutdown: nil, // set by (*Runner).Run + vm: vmInfo, + podName: podName, + podIP: podIP, + lock: util.NewChanMutex(), + requestLock: util.NewChanMutex(), lastMetrics: nil, - scheduler: nil, - server: nil, + scheduler: atomic.Pointer[Scheduler]{}, + server: atomic.Pointer[InformantServer]{}, informant: nil, computeUnit: nil, lastApproved: nil, diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go index a37269724..9b38895da 100644 --- a/pkg/agent/informant.go +++ b/pkg/agent/informant.go @@ -11,6 +11,7 @@ import ( "net/http" "strconv" "strings" + "sync/atomic" "time" "github.com/google/uuid" @@ -70,13 +71,8 @@ type InformantServer struct { // This field MAY be read while holding EITHER runner.lock OR requestLock. mode InformantServerMode - // updatedInformant is signalled once, when the InformantServer's register request completes, - // and the value of runner.informant is updated. - updatedInformant util.CondChannelSender - - // upscaleRequested is signalled whenever a valid request on /try-upscale is received, with at - // least one field set to true (i.e., at least one resource is being requested). - upscaleRequested util.CondChannelSender + // callbacks provide an abstraction for + callbacks informantStateCallbacks // requestLock guards requests to the VM informant to make sure that only one request is being // made at a time. @@ -86,7 +82,7 @@ type InformantServer struct { requestLock util.ChanMutex // exitStatus holds some information about why the server exited - exitStatus *InformantServerExitStatus + exitStatus atomic.Pointer[InformantServerExitStatus] // exit signals that the server should shut down, and sets exitStatus to status. // @@ -130,8 +126,7 @@ func NewInformantServer( ctx context.Context, logger *zap.Logger, runner *Runner, - updatedInformant util.CondChannelSender, - upscaleRequested util.CondChannelSender, + callbacks informantStateCallbacks, ) (*InformantServer, util.SignalReceiver, error) { // Manually start the TCP listener so that we can see the port it's assigned addr := net.TCPAddr{IP: net.IPv4zero, Port: 0 /* 0 means it'll be assigned any(-ish) port */} @@ -157,16 +152,15 @@ func NewInformantServer( MinProtoVersion: MinInformantProtocolVersion, MaxProtoVersion: MaxInformantProtocolVersion, }, - seqNum: 0, - receivedIDCheck: false, - madeContact: false, - protoVersion: nil, - mode: InformantServerUnconfirmed, - updatedInformant: updatedInformant, - upscaleRequested: upscaleRequested, - requestLock: util.NewChanMutex(), - exitStatus: nil, - exit: nil, // see below. + seqNum: 0, + receivedIDCheck: false, + madeContact: false, + protoVersion: nil, + mode: InformantServerUnconfirmed, + callbacks: callbacks, + requestLock: util.NewChanMutex(), + exitStatus: atomic.Pointer[InformantServerExitStatus]{}, + exit: nil, // see below. } logger = logger.With(zap.Object("server", server.desc)) @@ -188,8 +182,7 @@ func NewInformantServer( cancelBackground() // Set server.exitStatus if isn't already - if server.exitStatus == nil { - server.exitStatus = &status + if swapped := server.exitStatus.CompareAndSwap(nil, &status); swapped { logFunc := logger.Warn if status.RetryShouldFix { logFunc = logger.Info @@ -240,15 +233,10 @@ func NewInformantServer( // set server.exitStatus if it isn't already -- generally this should only occur if err // isn't http.ErrServerClosed, because other server exits should be controlled by - runner.lock.Lock() - defer runner.lock.Unlock() - - if server.exitStatus == nil { - server.exitStatus = &InformantServerExitStatus{ - Err: fmt.Errorf("Unexpected exit: %w", err), - RetryShouldFix: false, - } - } + server.exitStatus.CompareAndSwap(nil, &InformantServerExitStatus{ + Err: fmt.Errorf("Unexpected exit: %w", err), + RetryShouldFix: false, + }) }) // Thread waiting for the context to be canceled so we can use it to shut down the server @@ -319,7 +307,7 @@ func IsNormalInformantError(err error) bool { // // This method MUST be called while holding s.runner.lock. func (s *InformantServer) valid() error { - if s.exitStatus != nil { + if s.exitStatus.Load() != nil { return InformantServerAlreadyExitedError } @@ -334,7 +322,7 @@ func (s *InformantServer) valid() error { panic(fmt.Errorf("Unexpected InformantServerMode %q", s.mode)) } - if s.runner.server != s { + if s.runner.server.Load() != s { return InformantServerNotCurrentError } return nil @@ -342,13 +330,8 @@ func (s *InformantServer) valid() error { // ExitStatus returns the InformantServerExitStatus associated with the server, if it has been // instructed to exit -// -// This method MUST NOT be called while holding s.runner.lock. func (s *InformantServer) ExitStatus() *InformantServerExitStatus { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - return s.exitStatus + return s.exitStatus.Load() } // setLastInformantError is a helper method to abbreviate setting the Runner's lastInformantError @@ -362,7 +345,7 @@ func (s *InformantServer) setLastInformantError(err error, runnerLocked bool) { defer s.runner.lock.Unlock() } - if s.runner.server == s { + if s.runner.server.Load() == s { s.runner.lastInformantError = err } } @@ -396,7 +379,7 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context, logger *zap panic(fmt.Errorf("Unexpected InformantServerMode %q", s.mode)) } - if s.exitStatus != nil { + if s.ExitStatus() != nil { err := InformantServerAlreadyExitedError s.setLastInformantError(err, true) return err @@ -468,22 +451,25 @@ func (s *InformantServer) RegisterWithInformant(ctx context.Context, logger *zap s.mode = InformantServerSuspended s.protoVersion = &resp.ProtoVersion - if s.runner.server == s { - oldInformant := s.runner.informant - s.runner.informant = resp - s.updatedInformant.Send() // signal we've changed the informant - - if oldInformant == nil { - logger.Info("Registered with informant", zap.Any("informant", *resp)) - } else if *oldInformant != *resp { - logger.Info( - "Re-registered with informant, InformantDesc changed", - zap.Any("oldInformant", *oldInformant), - zap.Any("informant", *resp), - ) - } else { - logger.Info("Re-registered with informant; InformantDesc unchanged", zap.Any("informant", *oldInformant)) - } + if s.runner.server.Load() == s { + // signal we've changed the informant, and do the logging while we're at it, so there's + // a synchronous record of what happened. + s.callbacks.registered(false, func() { + oldInformant := s.runner.informant + s.runner.informant = resp + + if oldInformant == nil { + logger.Info("Registered with informant", zap.Any("informant", *resp)) + } else if *oldInformant != *resp { + logger.Info( + "Re-registered with informant, InformantDesc changed", + zap.Any("oldInformant", *oldInformant), + zap.Any("informant", *resp), + ) + } else { + logger.Info("Re-registered with informant; InformantDesc unchanged", zap.Any("informant", *oldInformant)) + } + }) } else { logger.Warn("Registering with informant completed but the server has already been replaced") } @@ -687,7 +673,7 @@ func (s *InformantServer) handleID(ctx context.Context, _ *zap.Logger, body *str s.receivedIDCheck = true - if s.exitStatus != nil { + if s.ExitStatus() != nil { return nil, 404, errors.New("Server has already exited") } @@ -726,7 +712,7 @@ func (s *InformantServer) handleResume( s.runner.lock.Lock() defer s.runner.lock.Unlock() - if s.exitStatus != nil { + if s.ExitStatus() != nil { return nil, 404, errors.New("Server has already exited") } @@ -737,12 +723,14 @@ func (s *InformantServer) handleResume( switch s.mode { case InformantServerSuspended: s.mode = InformantServerRunning - logger.Info( - "Informant server mode updated", - zap.String("action", "resume"), - zap.String("oldMode", string(InformantServerSuspended)), - zap.String("newMode", string(InformantServerRunning)), - ) + s.callbacks.setActive(true, func() { + logger.Info( + "Informant server mode updated", + zap.String("action", "resume"), + zap.String("oldMode", string(InformantServerSuspended)), + zap.String("newMode", string(InformantServerRunning)), + ) + }) case InformantServerRunning: internalErr := errors.New("Got /resume request for server, but it is already running") logger.Warn("Protocol violation", zap.Error(internalErr)) @@ -799,19 +787,21 @@ func (s *InformantServer) handleSuspend( } }() - if s.exitStatus != nil { + if s.ExitStatus() != nil { return nil, 404, errors.New("Server has already exited") } switch s.mode { case InformantServerRunning: s.mode = InformantServerSuspended - logger.Info( - "Informant server mode updated", - zap.String("action", "suspend"), - zap.String("oldMode", string(InformantServerRunning)), - zap.String("newMode", string(InformantServerSuspended)), - ) + s.callbacks.setActive(false, func() { + logger.Info( + "Informant server mode updated", + zap.String("action", "suspend"), + zap.String("oldMode", string(InformantServerRunning)), + zap.String("newMode", string(InformantServerSuspended)), + ) + }) case InformantServerSuspended: internalErr := errors.New("Got /suspend request for server, but it is already suspended") logger.Warn("Protocol violation", zap.Error(internalErr)) @@ -876,7 +866,7 @@ func (s *InformantServer) handleTryUpscale( s.runner.lock.Lock() defer s.runner.lock.Unlock() - if s.exitStatus != nil { + if s.ExitStatus() != nil { return nil, 404, errors.New("Server has already exited") } @@ -887,18 +877,16 @@ func (s *InformantServer) handleTryUpscale( return nil, 400, err } - if body.MoreResources.Cpu || body.MoreResources.Memory { - s.upscaleRequested.Send() - } else { - logger.Warn("Received try-upscale request that has no resources selected") - } + s.callbacks.upscaleRequested(body.MoreResources, func() { + if !body.MoreResources.Cpu && !body.MoreResources.Memory { + logger.Warn("Received try-upscale request that has no resources selected") + } - logger.Info( - "Updating requested upscale", - zap.Any("oldRequested", s.runner.requestedUpscale), - zap.Any("newRequested", body.MoreResources), - ) - s.runner.requestedUpscale = body.MoreResources + logger.Info( + "Updating requested upscale", + zap.Any("requested", body.MoreResources), + ) + }) return &api.AgentIdentificationMessage{ Data: api.AgentIdentification{AgentID: s.desc.AgentID}, @@ -982,11 +970,8 @@ func (s *InformantServer) HealthCheck(ctx context.Context, logger *zap.Logger) ( // Downscale makes a request to the informant's /downscale endpoint with the api.Resources // -// This method MUST NOT be called while holding i.server.runner.lock OR i.server.requestLock. +// This method MUST NOT be called while holding i.server.runner.lock. func (s *InformantServer) Downscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error) { - s.requestLock.Lock() - defer s.requestLock.Unlock() - err := func() error { s.runner.lock.Lock() defer s.runner.lock.Unlock() @@ -1039,9 +1024,6 @@ func (s *InformantServer) Downscale(ctx context.Context, logger *zap.Logger, to } func (s *InformantServer) Upscale(ctx context.Context, logger *zap.Logger, to api.Resources) error { - s.requestLock.Lock() - defer s.requestLock.Unlock() - err := func() error { s.runner.lock.Lock() defer s.runner.lock.Unlock() diff --git a/pkg/agent/runner.go b/pkg/agent/runner.go index bbb321237..f14a8ec81 100644 --- a/pkg/agent/runner.go +++ b/pkg/agent/runner.go @@ -52,7 +52,6 @@ import ( "errors" "fmt" "io" - "math" "net/http" "runtime/debug" "strconv" @@ -64,6 +63,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" + "github.com/neondatabase/autoscaling/pkg/agent/executor" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -129,16 +129,12 @@ type Runner struct { // from non-nil to nil. The data behind each pointer is immutable, but the value of the pointer // itself is not. lastMetrics *api.Metrics - // requestedUpscale provides information about any requested upscaling by a VM informant - // - // This value is reset whenever we start a new informant server - requestedUpscale api.MoreResources // scheduler is the current scheduler that we're communicating with, or nil if there isn't one. // Each scheduler's info field is immutable. When a scheduler is replaced, only the pointer // value here is updated; the original Scheduler remains unchanged. - scheduler *Scheduler - server *InformantServer + scheduler atomic.Pointer[Scheduler] + server atomic.Pointer[InformantServer] // informant holds the most recent InformantDesc that an InformantServer has received in its // normal operation. If there has been at least one InformantDesc received, this field will not // be nil. @@ -243,24 +239,24 @@ func (r *Runner) State(ctx context.Context) (*RunnerState, error) { defer r.lock.Unlock() var scheduler *SchedulerState - if r.scheduler != nil { + if sched := r.scheduler.Load(); sched != nil { scheduler = &SchedulerState{ - Info: r.scheduler.info, - Registered: r.scheduler.registered, - FatalError: r.scheduler.fatalError, + Info: sched.info, + Registered: sched.registered, + FatalError: sched.fatalError, } } var serverState *InformantServerState - if r.server != nil { + if server := r.server.Load(); server != nil { serverState = &InformantServerState{ - Desc: r.server.desc, - SeqNum: r.server.seqNum, - ReceivedIDCheck: r.server.receivedIDCheck, - MadeContact: r.server.madeContact, - ProtoVersion: r.server.protoVersion, - Mode: r.server.mode, - ExitStatus: r.server.exitStatus, + Desc: server.desc, + SeqNum: server.seqNum, + ReceivedIDCheck: server.receivedIDCheck, + MadeContact: server.madeContact, + ProtoVersion: server.protoVersion, + Mode: server.mode, + ExitStatus: server.exitStatus.Load(), } } @@ -349,14 +345,29 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util schedulerWatch.Using(*scheduler) } - // signal when r.lastMetrics is updated - sendMetricsSignal, recvMetricsSignal := util.NewCondChannelPair() - // signal when new schedulers are *registered* - sendSchedSignal, recvSchedSignal := util.NewCondChannelPair() - // signal when r.informant is updated - sendInformantUpd, recvInformantUpd := util.NewCondChannelPair() - // signal when the informant requests upscaling - sendUpscaleRequested, recvUpscaleRequested := util.NewCondChannelPair() + execLogger := logger.Named("exec") + + coreExecLogger := execLogger.Named("core") + executorCore := executor.NewExecutorCore(coreExecLogger.Named("state"), r.vm, executor.Config{ + DefaultScalingConfig: r.global.config.Scaling.DefaultConfig, + PluginRequestTick: time.Second * time.Duration(r.global.config.Scheduler.RequestAtLeastEverySeconds), + InformantDeniedDownscaleCooldown: time.Second * time.Duration(r.global.config.Informant.RetryDeniedDownscaleSeconds), + InformantRetryWait: time.Second * time.Duration(r.global.config.Informant.RetryFailedRequestSeconds), + Warn: func(msg string, args ...any) { + coreExecLogger.Warn(fmt.Sprintf(msg, args...)) + }, + }) + + pluginIface := makePluginInterface(r, executorCore) + neonvmIface := makeNeonVMInterface(r) + informantIface := makeInformantInterface(r, executorCore) + + // "ecwc" stands for "ExecutorCoreWithClients" + ecwc := executorCore.WithClients(executor.ClientSet{ + Plugin: pluginIface, + NeonVM: neonvmIface, + Informant: informantIface, + }) logger.Info("Starting background workers") @@ -367,17 +378,44 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util r.spawnBackgroundWorker(ctx, logger, "deadlock checker (main)", ignoreLogger(mainDeadlockChecker)) r.spawnBackgroundWorker(ctx, logger, "deadlock checker (request lock)", ignoreLogger(reqDeadlockChecker)) r.spawnBackgroundWorker(ctx, logger, "track scheduler", func(c context.Context, l *zap.Logger) { - r.trackSchedulerLoop(c, l, scheduler, schedulerWatch, sendSchedSignal) + r.trackSchedulerLoop(c, l, scheduler, schedulerWatch, func(withLock func()) { + ecwc.Updater().NewScheduler(withLock) + }) }) + sendInformantUpd, recvInformantUpd := util.NewCondChannelPair() r.spawnBackgroundWorker(ctx, logger, "get metrics", func(c context.Context, l *zap.Logger) { - r.getMetricsLoop(c, l, sendMetricsSignal, recvInformantUpd) - }) - r.spawnBackgroundWorker(ctx, logger, "handle VM resources", func(c context.Context, l *zap.Logger) { - r.handleVMResources(c, l, recvMetricsSignal, recvUpscaleRequested, recvSchedSignal, vmInfoUpdated) + r.getMetricsLoop(c, l, recvInformantUpd, func(metrics api.Metrics, withLock func()) { + ecwc.Updater().UpdateMetrics(metrics, withLock) + }) }) r.spawnBackgroundWorker(ctx, logger, "informant server loop", func(c context.Context, l *zap.Logger) { - r.serveInformantLoop(c, l, sendInformantUpd, sendUpscaleRequested) + r.serveInformantLoop( + c, + l, + informantStateCallbacks{ + resetInformant: func(withLock func()) { + ecwc.Updater().ResetInformant(withLock) + }, + upscaleRequested: func(request api.MoreResources, withLock func()) { + ecwc.Updater().UpscaleRequested(request, withLock) + }, + registered: func(active bool, withLock func()) { + ecwc.Updater().InformantRegistered(active, func() { + sendInformantUpd.Send() + withLock() + }) + }, + setActive: func(active bool, withLock func()) { + ecwc.Updater().InformantActive(active, withLock) + }, + }, + ) }) + r.spawnBackgroundWorker(ctx, execLogger.Named("sleeper"), "executor: sleeper", ecwc.DoSleeper) + r.spawnBackgroundWorker(ctx, execLogger.Named("plugin"), "executor: plugin", ecwc.DoPluginRequests) + r.spawnBackgroundWorker(ctx, execLogger.Named("neonvm"), "executor: neonvm", ecwc.DoNeonVMRequests) + r.spawnBackgroundWorker(ctx, execLogger.Named("informant-downscale"), "executor: informant downscale", ecwc.DoInformantDownscales) + r.spawnBackgroundWorker(ctx, execLogger.Named("informant-upscale"), "executor: informant upscale", ecwc.DoInformantUpscales) // Note: Run doesn't terminate unless the parent context is cancelled - either because the VM // pod was deleted, or the autoscaler-agent is exiting. @@ -456,8 +494,8 @@ func (r *Runner) spawnBackgroundWorker(ctx context.Context, logger *zap.Logger, func (r *Runner) getMetricsLoop( ctx context.Context, logger *zap.Logger, - newMetrics util.CondChannelSender, updatedInformant util.CondChannelReceiver, + newMetrics func(metrics api.Metrics, withLock func()), ) { timeout := time.Second * time.Duration(r.global.config.Metrics.RequestTimeoutSeconds) waitBetweenDuration := time.Second * time.Duration(r.global.config.Metrics.SecondsBetweenRequests) @@ -474,14 +512,9 @@ func (r *Runner) getMetricsLoop( goto next } - logger.Info("Got metrics", zap.Any("metrics", *metrics)) - - func() { - r.lock.Lock() - defer r.lock.Unlock() - r.lastMetrics = metrics - newMetrics.Send() - }() + newMetrics(*metrics, func() { + logger.Info("Updated metrics", zap.Any("metrics", *metrics)) + }) next: waitBetween := time.After(waitBetweenDuration) @@ -506,138 +539,11 @@ func (r *Runner) getMetricsLoop( } } -// handleVMResources is the primary background worker responsible for updating the desired state of -// the VM and communicating with the other components to make that happen, if possible. -// -// A new desired state is calculated when signalled on updatedMetrics or newScheduler. -// -// It may not be obvious at first, so: The reason why we try again when signalled on newScheduler, -// even though scheduler registration is handled separately, is that we might've had a prior desired -// increase that wasn't possible at the time (because the scheduler was unavailable) but is now -// possible, without the metrics being updated. -func (r *Runner) handleVMResources( - ctx context.Context, - logger *zap.Logger, - updatedMetrics util.CondChannelReceiver, - upscaleRequested util.CondChannelReceiver, - registeredScheduler util.CondChannelReceiver, - vmInfoUpdated util.CondChannelReceiver, -) { - for { - var reason VMUpdateReason - - select { - case <-ctx.Done(): - return - case <-updatedMetrics.Recv(): - reason = UpdatedMetrics - case <-upscaleRequested.Recv(): - reason = UpscaleRequested - case <-registeredScheduler.Recv(): - reason = RegisteredScheduler - case <-vmInfoUpdated.Recv(): - // Only actually do the update if something we care about changed: - newVMInfo := func() api.VmInfo { - r.status.mu.Lock() - defer r.status.mu.Unlock() - return r.status.vmInfo - }() - - if !newVMInfo.ScalingEnabled { - // This shouldn't happen because any update to the VM object that has - // ScalingEnabled=false should get translated into a "deletion" so the runner stops. - // So we shoudln't get an "update" event, and if we do, something's gone very wrong. - panic("explicit VM update given but scaling is disabled") - } - - // Update r.vm and r.lastApproved (see comment explaining why) - if changed := func() (changed bool) { - r.lock.Lock() - defer r.lock.Unlock() - - if r.vm.Mem.SlotSize.Cmp(*newVMInfo.Mem.SlotSize) != 0 { - // VM memory slot sizes can't change at runtime, at time of writing (2023-04-12). - // It's worth checking it here though, because something must have gone horribly - // wrong elsewhere for the memory slots size to change that it's worth aborting - // before anything else goes wrong - and if, in future, we allow them to change, - // it's better to panic than have subtly incorrect logic. - panic("VM changed memory slot size") - } - - // Create vm, which is r.vm with some fields taken from newVMInfo. - // - // Instead of copying r.vm, we create the entire struct explicitly so that we can - // have field exhaustiveness checking make sure that we don't forget anything when - // fields are added to api.VmInfo. - vm := api.VmInfo{ - Name: r.vm.Name, - Namespace: r.vm.Namespace, - Cpu: api.VmCpuInfo{ - Min: newVMInfo.Cpu.Min, - Use: r.vm.Cpu.Use, // TODO: Eventually we should explicitly take this as input, use newVMInfo - Max: newVMInfo.Cpu.Max, - }, - Mem: api.VmMemInfo{ - Min: newVMInfo.Mem.Min, - Use: r.vm.Mem.Use, // TODO: Eventually we should explicitly take this as input, use newVMInfo - Max: newVMInfo.Mem.Max, - - SlotSize: r.vm.Mem.SlotSize, // checked for equality above. - }, - - ScalingConfig: newVMInfo.ScalingConfig, - AlwaysMigrate: newVMInfo.AlwaysMigrate, - ScalingEnabled: newVMInfo.ScalingEnabled, // note: see above, checking newVMInfo.ScalingEnabled != false - } - - changed = vm != r.vm - r.vm = vm - - // As a final (necessary) precaution, update lastApproved so that it isn't possible - // for the scheduler to observe a temporary low upper bound that causes it to - // have state that's inconsistent with us (potentially causing overallocation). If - // we didn't handle this, the following sequence of actions would cause inconsistent - // state: - // - // 1. VM is at 4 CPU (of max 4), runner & scheduler agree - // 2. Scheduler dies - // 3. Runner loses contact with scheduler - // 4. VM Cpu.Max gets set to 2 - // 5. Runner observes Cpu.Max = 2 and forces downscale to 2 CPU - // 6. New scheduler appears, observes Cpu.Max = 2 - // 7. VM Cpu.Max gets set to 4 - // 8. Runner observes Cpu.Max = 4 (lastApproved is still 4) - // <-- INCONSISTENT STATE --> - // 9. Scheduler observes Cpu.Max = 4 - // - // If the runner observes the updated state before the scheduler, it's entirely - // possible for the runner to make a request that *it* thinks is just informative, - // but that the scheduler thinks is requesting more resources. At that point, the - // request can unexpectedly fail, or the scheduler can over-allocate, etc. - if r.lastApproved != nil { - *r.lastApproved = r.lastApproved.Min(vm.Max()) - } - - return - }(); !changed { - continue - } - - reason = UpdatedVMInfo - } - - err := r.updateVMResources( - ctx, logger, reason, updatedMetrics.Consume, registeredScheduler.Consume, - ) - if err != nil { - if ctx.Err() != nil { - logger.Warn("Error updating VM resources (but context already expired)", zap.Error(err)) - return - } - - logger.Error("Error updating VM resources", zap.Error(err)) - } - } +type informantStateCallbacks struct { + resetInformant func(withLock func()) + upscaleRequested func(request api.MoreResources, withLock func()) + registered func(active bool, withLock func()) + setActive func(active bool, withLock func()) } // serveInformantLoop repeatedly creates an InformantServer to handle communications with the VM @@ -647,8 +553,7 @@ func (r *Runner) handleVMResources( func (r *Runner) serveInformantLoop( ctx context.Context, logger *zap.Logger, - updatedInformant util.CondChannelSender, - upscaleRequested util.CondChannelSender, + callbacks informantStateCallbacks, ) { // variables set & accessed across loop iterations var ( @@ -664,13 +569,6 @@ func (r *Runner) serveInformantLoop( retryServer: for { - // On each (re)try, unset the informant's requested upscale. We need to do this *before* - // starting the server, because otherwise it's possible for a racy /try-upscale request to - // sneak in before we reset it, which would cause us to incorrectly ignore the request. - if upscaleRequested.Unsend() { - logger.Info("Cancelled existing 'upscale requested' signal due to informant server restart") - } - if normalRetryWait != nil { logger.Info("Retrying informant server after delay", zap.Duration("delay", normalWait)) select { @@ -701,7 +599,7 @@ retryServer: minRetryWait = time.After(minWait) lastStart = time.Now() - server, exited, err := NewInformantServer(ctx, logger, r, updatedInformant, upscaleRequested) + server, exited, err := NewInformantServer(ctx, logger, r, callbacks) if ctx.Err() != nil { if err != nil { logger.Warn("Error starting informant server (but context canceled)", zap.Error(err)) @@ -719,14 +617,14 @@ retryServer: defer r.lock.Unlock() var kind string - if r.server == nil { + if r.server.Load() == nil { kind = "Setting" } else { kind = "Updating" } logger.Info(fmt.Sprintf("%s initial informant server", kind), zap.Object("server", server.desc)) - r.server = server + r.server.Store(server) }() logger.Info("Registering with informant") @@ -790,7 +688,7 @@ func (r *Runner) trackSchedulerLoop( logger *zap.Logger, init *schedwatch.SchedulerInfo, schedulerWatch schedwatch.SchedulerWatch, - registeredScheduler util.CondChannelSender, + newScheduler func(withLock func()), ) { // pre-declare a bunch of variables because we have some gotos here. var ( @@ -819,9 +717,9 @@ startScheduler: fatal = func() util.SignalReceiver { logger := logger.With(zap.Object("scheduler", currentInfo)) - // Print info about a new scheduler, unless this is the first one. + verb := "Setting" if init == nil || init.UID != currentInfo.UID { - logger.Info("Updating scheduler pod") + verb = "Updating" } sendFatal, recvFatal := util.NewSingleSignalPair() @@ -834,31 +732,14 @@ startScheduler: fatal: sendFatal, } - func() { - r.lock.Lock() - defer r.lock.Unlock() - - r.scheduler = sched - r.lastSchedulerError = nil - }() - - r.spawnBackgroundWorker(ctx, logger, "Scheduler.Register()", func(c context.Context, logger *zap.Logger) { - r.requestLock.Lock() - defer r.requestLock.Unlock() + r.lock.Lock() + defer r.lock.Unlock() - // It's possible for another thread to take responsibility for registering the - // scheduler, instead of us. Don't need to double-register. - if sched.registered { - return - } + newScheduler(func() { + logger.Info(fmt.Sprintf("%s scheduler pod", verb)) - if err := sched.Register(c, logger, registeredScheduler.Send); err != nil { - if c.Err() != nil { - logger.Warn("Error registering with scheduler (but context is done)", zap.Error(err)) - } else { - logger.Error("Error registering with scheduler", zap.Error(err)) - } - } + r.scheduler.Store(sched) + r.lastSchedulerError = nil }) return recvFatal @@ -881,20 +762,22 @@ startScheduler: r.lock.Lock() defer r.lock.Unlock() - if r.scheduler.info.UID != info.UID { + scheduler := r.scheduler.Load() + + if scheduler.info.UID != info.UID { logger.Info( "Scheduler candidate pod was deleted, but we aren't using it yet", - zap.Object("scheduler", r.scheduler.info), zap.Object("candidate", info), + zap.Object("scheduler", scheduler.info), zap.Object("candidate", info), ) return false } logger.Info( "Scheduler pod was deleted. Aborting further communication", - zap.Object("scheduler", r.scheduler.info), + zap.Object("scheduler", scheduler.info), ) - r.scheduler = nil + r.scheduler.Store(nil) return true }() @@ -975,10 +858,10 @@ func (r *Runner) doMetricsRequestIfEnabled( // nice to have have the guarantees around not racing. clearNewInformantSignal() - if r.server == nil || r.server.mode != InformantServerRunning { + if server := r.server.Load(); server == nil || server.mode != InformantServerRunning { var state = "unset" - if r.server != nil { - state = string(r.server.mode) + if server != nil { + state = string(server.mode) } logger.Info(fmt.Sprintf("Cannot make metrics request because informant server is %s", state)) @@ -1041,454 +924,7 @@ func (r *Runner) doMetricsRequestIfEnabled( return handle(body) } -// VMUpdateReason provides context to (*Runner).updateVMResources about why an update to the VM's -// resources has been requested -type VMUpdateReason string - -const ( - UpdatedMetrics VMUpdateReason = "metrics" - UpscaleRequested VMUpdateReason = "upscale requested" - RegisteredScheduler VMUpdateReason = "scheduler" - UpdatedVMInfo VMUpdateReason = "updated VM info" -) - -// atomicUpdateState holds some pre-validated data for (*Runner).updateVMResources, fetched -// atomically (i.e. all at once, while holding r.lock) with the (*Runner).atomicState method -// -// Because atomicState is able to return nil when there isn't yet enough information to update the -// VM's resources, some validation is already guaranteed by representing the data without pointers. -type atomicUpdateState struct { - computeUnit api.Resources - metrics api.Metrics - vm api.VmInfo - lastApproved api.Resources - requestedUpscale api.MoreResources - config api.ScalingConfig -} - -// updateVMResources is responsible for the high-level logic that orchestrates a single update to -// the VM's resources - or possibly just informing the scheduler that nothing's changed. -// -// This method sometimes returns nil if the reason we couldn't perform the update was solely because -// other information was missing (e.g., we haven't yet contacted a scheduler). In these cases, an -// appropriate message is logged. -func (r *Runner) updateVMResources( - ctx context.Context, - logger *zap.Logger, - reason VMUpdateReason, - clearUpdatedMetricsSignal func(), - clearNewSchedulerSignal func(), -) error { - // Acquiring this lock *may* take a while, so we'll allow it to be interrupted by ctx - // - // We'll need the lock for access to the scheduler and NeonVM, and holding it across all the - // request means that our logic can be a little simpler :) - if err := r.requestLock.TryLock(ctx); err != nil { - return err - } - defer r.requestLock.Unlock() - - logger.Info("Updating VM resources", zap.String("reason", string(reason))) - - // A /suspend request from a VM informant will wait until requestLock returns. So we're good to - // make whatever requests we need as long as the informant is here at the start. - // - // The reason we care about the informant server being "enabled" is that the VM informant uses - // it to ensure that there's at most one autoscaler-agent that's making requests on its behalf. - if err := r.validateInformant(); err != nil { - logger.Warn("Unable to update VM resources because informant server is disabled", zap.Error(err)) - return nil - } - - // state variables - var ( - start api.Resources // r.vm.Using(), at the time of the start of this function - for metrics. - target api.Resources - capped api.Resources // target, but capped by r.lastApproved - ) - - if r.schedulerRespondedWithMigration { - logger.Info("Aborting VM resource update because scheduler previously said VM is migrating") - return nil - } - - state, err := func() (*atomicUpdateState, error) { - r.lock.Lock() - defer r.lock.Unlock() - - clearUpdatedMetricsSignal() - - state := r.getStateForVMUpdate(logger, reason) - if state == nil { - // if state == nil, the reason why we can't do the operation was already logged. - return nil, nil - } else if r.scheduler != nil && r.scheduler.fatalError != nil { - logger.Warn("Unable to update VM resources because scheduler had a prior fatal error") - return nil, nil - } - - // Calculate the current and desired state of the VM - target = state.desiredVMState(true) // note: this sets the state value in the loop body - - current := state.vm.Using() - start = current - - msg := "Target VM state is equal to current" - if target != current { - msg = "Target VM state different from current" - } - logger.Info(msg, zap.Object("current", current), zap.Object("target", target)) - - // Check if there's resources that can (or must) be updated before talking to the scheduler. - // - // During typical operation, this only occurs when the target state corresponds to fewer - // compute units than the current state. However, this can also happen when: - // - // * lastApproved and target are both greater than the VM's state; or - // * VM's state doesn't match the compute unit and only one resource is being decreased - // - // To make handling these edge-cases smooth, the code here is more generic than typical - // operation requires. - - // note: r.atomicState already checks the validity of r.lastApproved - namely that it has no - // values less than r.vm.Using(). - capped = target.Min(state.lastApproved) // note: this sets the state value in the loop body - - return state, nil - }() - - // note: state == nil means that there's some other reason we couldn't do the operation that - // was already logged. - if err != nil || state == nil { - return err - } - - // If there's an update that can be done immediately, do it! Typically, capped will - // represent the resources we'd like to downscale. - if capped != state.vm.Using() { - // If our downscale gets rejected, calculate a new target - rejectedDownscale := func() (newTarget api.Resources, _ error) { - target = state.desiredVMState(false /* don't allow downscaling */) - return target.Min(state.lastApproved), nil - } - - nowUsing, err := r.doVMUpdate(ctx, logger, state.vm.Using(), capped, rejectedDownscale) - if err != nil { - return fmt.Errorf("Error doing VM update 1: %w", err) - } else if nowUsing == nil { - // From the comment above doVMUpdate: - // - // > If the VM informant is required and unavailable (or becomes unavailable), this - // > method will: return nil, nil; log an appropriate warning; and reset the VM's - // > state to its current value. - // - // So we should just return nil. We can't update right now, and there isn't anything - // left to log. - return nil - } - - state.vm.SetUsing(*nowUsing) - } - - // Fetch the scheduler, to (a) inform it of the current state, and (b) request an - // increase, if we want one. - sched := func() *Scheduler { - r.lock.Lock() - defer r.lock.Unlock() - - clearNewSchedulerSignal() - return r.scheduler - }() - - // If we can't reach the scheduler, then we've already done everything we can. Emit a - // warning and exit. We'll get notified to retry when a new one comes online. - if sched == nil { - logger.Warn("Unable to complete updating VM resources", zap.Error(errors.New("no scheduler registered"))) - return nil - } - - // If the scheduler isn't registered yet, then either the initial register request failed, or it - // hasn't gotten a chance to send it yet. - if !sched.registered { - if err := sched.Register(ctx, logger, func() {}); err != nil { - logger.Error("Error registering with scheduler", zap.Object("scheduler", sched.info), zap.Error(err)) - logger.Warn("Unable to complete updating VM resources", zap.Error(errors.New("scheduler Register request failed"))) - return nil - } - } - - r.recordResourceChange(start, target, r.global.metrics.schedulerRequestedChange) - - request := api.AgentRequest{ - ProtoVersion: PluginProtocolVersion, - Pod: r.podName, - Resources: target, - Metrics: &state.metrics, // FIXME: the metrics here *might* be a little out of date. - } - response, err := sched.DoRequest(ctx, logger, &request) - if err != nil { - logger.Error("Scheduler request failed", zap.Object("scheduler", sched.info), zap.Error(err)) - logger.Warn("Unable to complete updating VM resources", zap.Error(errors.New("scheduler request failed"))) - return nil - } else if response.Migrate != nil { - // info about migration has already been logged by DoRequest - return nil - } - - permit := response.Permit - r.recordResourceChange(start, permit, r.global.metrics.schedulerApprovedChange) - - // sched.DoRequest should have validated the permit, meaning that it's not less than the - // current resource usage. - vmUsing := state.vm.Using() - if permit.HasFieldLessThan(vmUsing) { - panic(errors.New("invalid state: permit less than what's in use")) - } else if permit.HasFieldGreaterThan(target) { - panic(errors.New("invalid state: permit greater than target")) - } - - if permit == vmUsing { - if vmUsing != target { - logger.Info("Scheduler denied increase, staying at current", zap.Object("current", vmUsing)) - } - - // nothing to do - return nil - } else /* permit > vmUsing */ { - if permit != target { - logger.Warn("Scheduler capped increase to permit", zap.Object("permit", permit)) - } else { - logger.Info("Scheduler allowed increase to permit", zap.Object("permit", permit)) - } - - rejectedDownscale := func() (newTarget api.Resources, _ error) { - panic(errors.New("rejectedDownscale called but request should be increasing, not decreasing")) - } - if _, err := r.doVMUpdate(ctx, logger, vmUsing, permit, rejectedDownscale); err != nil { - return fmt.Errorf("Error doing VM update 2: %w", err) - } - - return nil - } -} - -// getStateForVMUpdate produces the atomicUpdateState for updateVMResources -// -// This method MUST be called while holding r.lock. -func (r *Runner) getStateForVMUpdate(logger *zap.Logger, updateReason VMUpdateReason) *atomicUpdateState { - if r.lastMetrics == nil { - if updateReason == UpdatedMetrics { - panic(errors.New("invalid state: metrics signalled but r.lastMetrics == nil")) - } - - logger.Warn("Unable to update VM resources because we haven't received metrics yet") - return nil - } else if r.computeUnit == nil { - if updateReason == RegisteredScheduler { - // note: the scheduler that was registered might not be the scheduler we just got! - // However, r.computeUnit is never supposed to go from non-nil to nil, so that doesn't - // actually matter. - panic(errors.New("invalid state: registered scheduler signalled but r.computeUnit == nil")) - } - - // note: as per the docs on r.computeUnit, this should only occur when we haven't yet talked - // to a scheduler. - logger.Warn("Unable to update VM resources because r.computeUnit hasn't been set yet") - return nil - } else if r.lastApproved == nil { - panic(errors.New("invalid state: r.computeUnit != nil but r.lastApproved == nil")) - } - - // Check that the VM's current usage is <= lastApproved - if vmUsing := r.vm.Using(); vmUsing.HasFieldGreaterThan(*r.lastApproved) { - panic(fmt.Errorf( - "invalid state: r.vm has resources greater than r.lastApproved (%+v vs %+v)", - vmUsing, *r.lastApproved, - )) - } - - config := r.global.config.Scaling.DefaultConfig - if r.vm.ScalingConfig != nil { - config = *r.vm.ScalingConfig - } - - return &atomicUpdateState{ - computeUnit: *r.computeUnit, - metrics: *r.lastMetrics, - vm: r.vm, - lastApproved: *r.lastApproved, - requestedUpscale: r.requestedUpscale, - config: config, - } -} - -// desiredVMState calculates what the resource allocation to the VM should be, given the metrics and -// current state. -func (s *atomicUpdateState) desiredVMState(allowDecrease bool) api.Resources { - // There's some annoying edge cases that this function has to be able to handle properly. For - // the sake of completeness, they are: - // - // 1. s.vm.Using() is not a multiple of s.computeUnit - // 2. s.vm.Max() is less than s.computeUnit (or: has at least one resource that is) - // 3. s.vm.Using() is a fractional multiple of s.computeUnit, but !allowDecrease and rounding up - // is greater than s.vm.Max() - // 4. s.vm.Using() is much larger than s.vm.Min() and not a multiple of s.computeUnit, but load - // is low so we should just decrease *anyways*. - // - // --- - // - // Broadly, the implementation works like this: - // 1. Based on load average, calculate the "goal" number of CPUs (and therefore compute units) - // 2. Cap the goal CU by min/max, etc - // 3. that's it! - - // Goal compute unit is at the point where (CPUs) × (LoadAverageFractionTarget) == (load - // average), - // which we can get by dividing LA by LAFT. - goalCU := uint32(math.Round(float64(s.metrics.LoadAverage1Min) / s.config.LoadAverageFractionTarget)) - - // Update goalCU based on any requested upscaling - goalCU = util.Max(goalCU, s.requiredCUForRequestedUpscaling()) - - // new CU must be >= current CU if !allowDecrease - if !allowDecrease { - _, upperBoundCU := s.computeUnitsBounds() - goalCU = util.Max(goalCU, upperBoundCU) - } - - // resources for the desired "goal" compute units - goal := s.computeUnit.Mul(uint16(goalCU)) - - // bound goal by the minimum and maximum resource amounts for the VM - result := goal.Min(s.vm.Max()).Max(s.vm.Min()) - - // Check that the result is sound. - // - // With the current (naive) implementation, this is trivially ok. In future versions, it might - // not be so simple, so it's good to have this integrity check here. - if result.HasFieldGreaterThan(s.vm.Max()) { - panic(fmt.Errorf( - "produced invalid desiredVMState: result has field greater than max. this = %+v", *s, - )) - } else if result.HasFieldLessThan(s.vm.Min()) { - panic(fmt.Errorf( - "produced invalid desiredVMState: result has field less than min. this = %+v", *s, - )) - } - - return result -} - -// computeUnitsBounds returns the minimum and maximum number of Compute Units required to fit each -// resource for the VM's current allocation -// -// Under typical operation, this will just return two equal values, both of which are equal to the -// VM's current number of Compute Units. However, if the VM's resource allocation doesn't cleanly -// divide to a multiple of the Compute Unit, the upper and lower bounds will be different. This can -// happen when the Compute Unit is changed, or when the VM's maximum or minimum resource allocations -// has previously prevented it from being set to a multiple of the Compute Unit. -func (s *atomicUpdateState) computeUnitsBounds() (uint32, uint32) { - // (x + M-1) / M is equivalent to ceil(x/M), as long as M != 0, which is already guaranteed by - // the - minCPUUnits := (uint32(s.vm.Cpu.Use) + uint32(s.computeUnit.VCPU) - 1) / uint32(s.computeUnit.VCPU) - minMemUnits := uint32((s.vm.Mem.Use + s.computeUnit.Mem - 1) / s.computeUnit.Mem) - - return util.Min(minCPUUnits, minMemUnits), util.Max(minCPUUnits, minMemUnits) -} - -// requiredCUForRequestedUpscaling returns the minimum Compute Units required to abide by the -// requested upscaling, if there is any. -// -// If there's no requested upscaling, then this method will return zero. -// -// This method does not respect any bounds on Compute Units placed by the VM's maximum or minimum -// resource allocation. -func (s *atomicUpdateState) requiredCUForRequestedUpscaling() uint32 { - var required uint32 - - // note: floor(x / M) + 1 gives the minimum integer value greater than x / M. - - if s.requestedUpscale.Cpu { - required = util.Max(required, uint32(s.vm.Cpu.Use/s.computeUnit.VCPU)+1) - } - if s.requestedUpscale.Memory { - required = util.Max(required, uint32(s.vm.Mem.Use/s.computeUnit.Mem)+1) - } - - return required -} - -// doVMUpdate handles updating the VM's resources from current to target WITHOUT CHECKING WITH THE -// SCHEDULER. It is the caller's responsibility to ensure that target is not greater than -// r.lastApproved, and check with the scheduler if necessary. -// -// If the VM informant is required and unavailable (or becomes unavailable), this method will: -// return nil, nil; log an appropriate warning; and reset the VM's state to its current value. -// -// If some resources in target are less than current, and the VM informant rejects the proposed -// downscaling, rejectedDownscale will be called. If it returns an error, that error will be -// returned and the update will be aborted. Otherwise, the returned newTarget will be used. -// -// This method MUST be called while holding r.requestLock AND NOT r.lock. -func (r *Runner) doVMUpdate( - ctx context.Context, - logger *zap.Logger, - current api.Resources, - target api.Resources, - rejectedDownscale func() (newTarget api.Resources, _ error), -) (*api.Resources, error) { - logger.Info("Attempting VM update", zap.Object("current", current), zap.Object("target", target)) - - // helper handling function to reset r.vm to reflect the actual current state. Must not be - // called while holding r.lock. - resetVMTo := func(amount api.Resources) { - r.lock.Lock() - defer r.lock.Unlock() - - r.vm.SetUsing(amount) - } - - if err := r.validateInformant(); err != nil { - logger.Warn("Aborting VM update because informant server is not valid", zap.Error(err)) - resetVMTo(current) - return nil, nil - } - - // If there's any fields that are being downscaled, request that from the VM informant. - downscaled := current.Min(target) - if downscaled != current { - r.recordResourceChange(current, downscaled, r.global.metrics.informantRequestedChange) - - resp, err := r.doInformantDownscale(ctx, logger, downscaled) - if err != nil || resp == nil /* resp = nil && err = nil when the error has been handled */ { - return nil, err - } - - if resp.Ok { - r.recordResourceChange(current, downscaled, r.global.metrics.informantApprovedChange) - } else { - newTarget, err := rejectedDownscale() - if err != nil { - resetVMTo(current) - return nil, err - } else if newTarget.HasFieldLessThan(current) { - panic(fmt.Errorf( - "rejectedDownscale returned new target less than current: %+v has field less than %+v", - newTarget, current, - )) - } - - if newTarget != target { - logger.Info("VM update: rejected downscale changed target", zap.Object("target", newTarget)) - } - - target = newTarget - } - } - - r.recordResourceChange(downscaled, target, r.global.metrics.neonvmRequestedChange) - - // Make the NeonVM request +func (r *Runner) doNeonVMRequest(ctx context.Context, target api.Resources) error { patches := []util.JSONPatch{{ Op: util.PatchReplace, Path: "/spec/guest/cpus/use", @@ -1499,8 +935,6 @@ func (r *Runner) doVMUpdate( Value: target.Mem, }} - logger.Info("Making NeonVM request for resources", zap.Object("target", target), zap.Any("patches", patches)) - patchPayload, err := json.Marshal(patches) if err != nil { panic(fmt.Errorf("Error marshalling JSON patch: %w", err)) @@ -1516,58 +950,13 @@ func (r *Runner) doVMUpdate( _, err = r.global.vmClient.NeonvmV1().VirtualMachines(r.vm.Namespace). Patch(requestCtx, r.vm.Name, ktypes.JSONPatchType, patchPayload, metav1.PatchOptions{}) - // We couldn't update the VM if err != nil { r.global.metrics.neonvmRequestsOutbound.WithLabelValues(fmt.Sprintf("[error: %s]", util.RootError(err))).Inc() - - // If the context was cancelled, we generally don't need to worry about whether setting r.vm - // back to current is sound. All operations on this VM are done anyways. - if ctx.Err() != nil { - resetVMTo(current) // FIXME: yeah, even though the comment above says "don't worry", maybe worry? - return nil, fmt.Errorf("Error making VM patch request: %w", err) - } - - // Otherwise, something went wrong *in the request itself*. This probably leaves us in an - // inconsistent state, so we're best off ending all further operations. The correct way to - // fatally error is by panicking - our infra here ensures it won't take down any other - // runners. - panic(fmt.Errorf("Unexpected VM patch request failure: %w", err)) + return err } r.global.metrics.neonvmRequestsOutbound.WithLabelValues("ok").Inc() - - // We scaled. If we run into an issue around further communications with the informant, then - // it'll be left with an inconsistent state - there's not really anything we can do about that, - // unfortunately. - resetVMTo(target) - - upscaled := target // we already handled downscaling; only upscaling can be left - if upscaled.HasFieldGreaterThan(current) { - // Unset fields in r.requestedUpscale if we've handled it. - // - // Essentially, for each field F, set: - // - // r.requestedUpscale.F = r.requestedUpscale && !(upscaled.F > current.F) - func() { - r.lock.Lock() - defer r.lock.Unlock() - - r.requestedUpscale = r.requestedUpscale.And(upscaled.IncreaseFrom(current).Not()) - }() - - r.recordResourceChange(downscaled, upscaled, r.global.metrics.informantRequestedChange) - - if ok, err := r.doInformantUpscale(ctx, logger, upscaled); err != nil || !ok { - return nil, err - } - - r.recordResourceChange(downscaled, upscaled, r.global.metrics.informantApprovedChange) - } - - logger.Info("Updated VM resources", zap.Object("current", current), zap.Object("target", target)) - - // Everything successful. - return &target, nil + return nil } func (r *Runner) recordResourceChange(current, target api.Resources, metrics resourceChangePair) { @@ -1601,141 +990,26 @@ func (r *Runner) recordResourceChange(current, target api.Resources, metrics res } } -// validateInformant checks that the Runner's informant server is present AND active (i.e. not -// suspended). -// -// If either condition is false, this method returns error. This is typically used to check that the -// Runner is enabled before making a request to NeonVM or the scheduler, in which case holding -// r.requestLock is advised. -// -// This method MUST NOT be called while holding r.lock. -func (r *Runner) validateInformant() error { - r.lock.Lock() - defer r.lock.Unlock() - - if r.server == nil { - return errors.New("no informant server set") - } - return r.server.valid() -} - -// doInformantDownscale is a convenience wrapper around (*InformantServer).Downscale that locks r, -// checks if r.server is nil, and does the request. -// -// Some errors are logged by this method instead of being returned. If that happens, this method -// returns nil, nil. -// -// This method MUST NOT be called while holding r.lock. -func (r *Runner) doInformantDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error) { - msg := "Error requesting informant downscale" - - server := func() *InformantServer { - r.lock.Lock() - defer r.lock.Unlock() - return r.server - }() - if server == nil { - return nil, fmt.Errorf("%s: InformantServer is not set (this should not occur after startup)", msg) - } - - resp, err := server.Downscale(ctx, logger, to) - if err != nil { - if IsNormalInformantError(err) { - logger.Warn(msg, zap.Object("server", server.desc), zap.Error(err)) - return nil, nil - } else { - return nil, fmt.Errorf("%s: %w", msg, err) - } - } - - return resp, nil -} - -// doInformantDownscale is a convenience wrapper around (*InformantServer).Upscale that locks r, -// checks if r.server is nil, and does the request. -// -// Some errors are logged by this method instead of being returned. If that happens, this method -// returns false, nil. -// -// This method MUST NOT be called while holding r.lock. -func (r *Runner) doInformantUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) (ok bool, _ error) { - msg := "Error notifying informant of upscale" - - server := func() *InformantServer { - r.lock.Lock() - defer r.lock.Unlock() - return r.server - }() - if server == nil { - return false, fmt.Errorf("%s: InformantServer is not set (this should not occur after startup)", msg) - } - - if err := server.Upscale(ctx, logger, to); err != nil { - if IsNormalInformantError(err) { - logger.Warn(msg, zap.Error(err)) - return false, nil - } else { - return false, fmt.Errorf("%s: %w", msg, err) - } - } - - return true, nil -} - -// Register performs the initial request required to register with a scheduler -// -// This method is called immediately after the Scheduler is created, and may be called -// subsequent times if the initial request fails. -// -// signalOk will be called if the request succeeds, with s.runner.lock held - but only if -// s.runner.scheduler == s. -// -// This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock -func (s *Scheduler) Register(ctx context.Context, logger *zap.Logger, signalOk func()) error { - metrics, resources := func() (*api.Metrics, api.Resources) { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - return s.runner.lastMetrics, s.runner.vm.Using() - }() - - req := api.AgentRequest{ +// DoRequest sends a request to the scheduler and does not validate the response. +func (s *Scheduler) DoRequest( + ctx context.Context, + logger *zap.Logger, + resources api.Resources, + metrics *api.Metrics, +) (_ *api.PluginResponse, err error) { + reqData := &api.AgentRequest{ ProtoVersion: PluginProtocolVersion, Pod: s.runner.podName, Resources: resources, Metrics: metrics, } - if _, err := s.DoRequest(ctx, logger, &req); err != nil { - return err - } - - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - s.registered = true - if s.runner.scheduler == s { - signalOk() - } - - return nil -} -// SendRequest implements all of the tricky logic for requests sent to the scheduler plugin -// -// This method checks: -// * That the response is semantically valid -// * That the response matches with the state of s.runner.vm, if s.runner.scheduler == s -// -// This method may set: -// - s.fatalError -// - s.runner.{computeUnit,lastApproved,lastSchedulerError,schedulerRespondedWithMigration}, -// if s.runner.scheduler == s. -// -// This method MAY ALSO call s.runner.shutdown(), if s.runner.scheduler == s. -// -// This method MUST be called while holding s.runner.requestLock AND NOT s.runner.lock. -func (s *Scheduler) DoRequest(ctx context.Context, logger *zap.Logger, reqData *api.AgentRequest) (*api.PluginResponse, error) { - logger = logger.With(zap.Object("scheduler", s.info)) + // make sure we log any error we're returning: + defer func() { + if err != nil { + logger.Error("Scheduler request failed", zap.Error(err)) + } + }() reqBody, err := json.Marshal(reqData) if err != nil { @@ -1754,7 +1028,7 @@ func (s *Scheduler) DoRequest(ctx context.Context, logger *zap.Logger, reqData * } request.Header.Set("content-type", "application/json") - logger.Info("Sending AgentRequest", zap.Any("request", reqData)) + logger.Info("Sending request to scheduler", zap.Any("request", reqData)) response, err := http.DefaultClient.Do(request) if err != nil { @@ -1794,89 +1068,11 @@ func (s *Scheduler) DoRequest(ctx context.Context, logger *zap.Logger, reqData * return nil, s.handleRequestError(reqData, fmt.Errorf("Bad JSON response: %w", err)) } - logger.Info("Received PluginResponse", zap.Any("response", respData)) - - s.runner.lock.Lock() - locked := true - defer func() { - if locked { - s.runner.lock.Unlock() - } - }() - - if err := s.validatePluginResponse(logger, reqData, &respData); err != nil { - // Must unlock before handling because it's required by validatePluginResponse, but - // handleFatalError is required not to have it. - locked = false - s.runner.lock.Unlock() - - // Fatal, because an invalid response indicates mismatched state, so we can't assume - // anything about the plugin's state. - return nil, s.handleFatalError(reqData, fmt.Errorf("Semantically invalid response: %w", err)) - } - - // if this scheduler is still current, update all the relevant fields in s.runner - if s.runner.scheduler == s { - s.runner.computeUnit = &respData.ComputeUnit - s.runner.lastApproved = &respData.Permit - s.runner.lastSchedulerError = nil - if respData.Migrate != nil { - logger.Info("Shutting down Runner because scheduler response indicated migration started") - s.runner.schedulerRespondedWithMigration = true - s.runner.shutdown() - } - } + logger.Info("Received response from scheduler", zap.Any("response", respData)) return &respData, nil } -// validatePluginResponse checks that the PluginResponse is valid for the AgentRequest that was -// sent. -// -// This method will not update any fields in s or s.runner. -// -// This method MUST be called while holding s.runner.requestLock AND s.runner.lock. -func (s *Scheduler) validatePluginResponse( - logger *zap.Logger, - req *api.AgentRequest, - resp *api.PluginResponse, -) error { - isCurrent := s.runner.scheduler == s - - if err := req.Resources.ValidateNonZero(); err != nil { - panic(fmt.Errorf("we created an invalid AgentRequest.Resources: %w", err)) - } - - // Errors from resp alone - if err := resp.Permit.ValidateNonZero(); err != nil { - return fmt.Errorf("Invalid permit: %w", err) - } - if err := resp.ComputeUnit.ValidateNonZero(); err != nil { - return fmt.Errorf("Invalid compute unit: %w", err) - } - - // Errors from resp in connection with the prior request - if resp.Permit.HasFieldGreaterThan(req.Resources) { - return fmt.Errorf( - "Permit has resources greater than request (%+v vs. %+v)", - resp.Permit, req.Resources, - ) - } - - // Errors from resp in connection with the prior request AND the VM state - if isCurrent { - if vmUsing := s.runner.vm.Using(); resp.Permit.HasFieldLessThan(vmUsing) { - return fmt.Errorf("Permit has resources less than VM (%+v vs %+v)", resp.Permit, vmUsing) - } - } - - if !isCurrent && resp.Migrate != nil { - logger.Warn("scheduler is no longer current, but its response signalled migration") - } - - return nil -} - // handlePreRequestError appropriately handles updating the Scheduler and its Runner's state to // reflect that an error occurred. It returns the error passed to it // @@ -1891,7 +1087,7 @@ func (s *Scheduler) handlePreRequestError(err error) error { s.runner.lock.Lock() defer s.runner.lock.Unlock() - if s.runner.scheduler == s { + if s.runner.scheduler.Load() == s { s.runner.lastSchedulerError = err } @@ -1912,7 +1108,7 @@ func (s *Scheduler) handleRequestError(req *api.AgentRequest, err error) error { s.runner.lock.Lock() defer s.runner.lock.Unlock() - if s.runner.scheduler == s { + if s.runner.scheduler.Load() == s { s.runner.lastSchedulerError = err // Because downscaling s.runner.vm must be done before any request that decreases its @@ -1947,7 +1143,7 @@ func (s *Scheduler) handleFatalError(req *api.AgentRequest, err error) error { s.fatalError = err - if s.runner.scheduler == s { + if s.runner.scheduler.Load() == s { s.runner.lastSchedulerError = err // for reasoning on lastApproved, see handleRequestError. lastApproved := s.runner.vm.Using() diff --git a/pkg/plugin/config.go b/pkg/plugin/config.go index cfb7b1090..40f3ff093 100644 --- a/pkg/plugin/config.go +++ b/pkg/plugin/config.go @@ -233,6 +233,7 @@ func (c *nodeConfig) vCpuLimits(total *resource.Quantity) (_ nodeResourceState[v System: vmapi.MilliCPU(systemCpus * 1000), Watermark: vmapi.MilliCPU(c.Cpu.Watermark * float32(reservableCpus) * 1000), Reserved: 0, + Buffer: 0, CapacityPressure: 0, PressureAccountedFor: 0, }, margin, nil @@ -277,6 +278,7 @@ func (c *nodeConfig) memoryLimits( System: uint16(systemSlots), Watermark: uint16(c.Memory.Watermark * float32(reservableSlots)), Reserved: 0, + Buffer: 0, CapacityPressure: 0, PressureAccountedFor: 0, }, margin, nil diff --git a/pkg/util/broadcast.go b/pkg/util/broadcast.go new file mode 100644 index 000000000..a11d8e03a --- /dev/null +++ b/pkg/util/broadcast.go @@ -0,0 +1,72 @@ +package util + +// A channel-based sync.Cond-like interface, with support for broadcast operations (but some +// additional restrictions) + +import ( + "sync" +) + +func NewBroadcaster() *Broadcaster { + return &Broadcaster{ + mu: sync.Mutex{}, + ch: make(chan struct{}, 1), + sent: 0, + } +} + +type Broadcaster struct { + mu sync.Mutex + ch chan struct{} + + sent uint64 +} + +type BroadcastReceiver struct { + b *Broadcaster + + viewed uint64 +} + +func (b *Broadcaster) Broadcast() { + b.mu.Lock() + defer b.mu.Unlock() + + close(b.ch) + b.ch = make(chan struct{}, 1) + b.sent += 1 +} + +func (b *Broadcaster) NewReceiver() BroadcastReceiver { + b.mu.Lock() + defer b.mu.Unlock() + + return BroadcastReceiver{ + b: b, + viewed: b.sent, + } +} + +var closedChannel = func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +}() + +func (r *BroadcastReceiver) Wait() <-chan struct{} { + r.b.mu.Lock() + defer r.b.mu.Unlock() + + if r.b.sent == r.viewed { + return r.b.ch + } else { + return closedChannel + } +} + +func (r *BroadcastReceiver) Awake() { + r.b.mu.Lock() + defer r.b.mu.Unlock() + + r.viewed = r.b.sent +} diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go index c8b319f89..859c38b96 100644 --- a/pkg/util/watch/watch.go +++ b/pkg/util/watch/watch.go @@ -149,12 +149,14 @@ func Watch[C Client[L], L metav1.ListMetaAccessor, T any, P Object[T]]( sendStop, stopSignal := util.NewSingleSignalPair() store := Store[T]{ + mutex: sync.Mutex{}, objects: make(map[types.UID]*T), triggerRelist: make(chan struct{}, 1), // ensure sends are non-blocking relisted: make(chan struct{}), nextIndexID: 0, indexes: make(map[uint64]Index[T]), stopSignal: sendStop, + stopped: atomic.Bool{}, } items := accessors.Items(initialList)