Skip to content

Commit

Permalink
agent: Rewrite scaling logic into pkg/agent/{core,executor}
Browse files Browse the repository at this point in the history
At a very high level, this work replaces (*Runner).handleVMResources(),
moving from an imperative style to an explicit state machine.
This new version is more complicated, but ultimately more flexible and
easier to extend.

The decision-making "core" of the scaling logic is implemented by
(*core.State).NextActions(), which returns an ActionSet indicating what
the "caller" should do. NextActions() is a pure function, making this
easier to test - at least, in theory.

That method is called and cached by executor.ExecutorCore, where there's
a few different threads (each defined in exec_*.go) responsible for
implementing the communications with the other components - namely, the
scheduler plugin, vm-informant, and NeonVM k8s API.

The various "executor" threads are written generically, using dedicated
interfaces (e.g. PluginInterface / PluginHandle) that are implemented in
pkg/agent/execbridge.go.
  • Loading branch information
sharnoff committed Sep 28, 2023
1 parent ddb6671 commit 0faed63
Show file tree
Hide file tree
Showing 18 changed files with 2,136 additions and 1,045 deletions.
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ run:
skip-dirs:
- neonvm

issues:
exclude:
# ChanMutex contains only a channel, which *is* safe to copy
- 'copylocks: return copies lock value: github\.com/neondatabase/autoscaling/pkg/util\.ChanMutex'

output:
format: colored-line-number
print-issued-lines: true
Expand Down Expand Up @@ -56,8 +61,11 @@ linters-settings:
- '^github.com/prometheus/client_golang/prometheus(/.*)?\.\w+Opts$'
- '^github\.com/containerd/cgroups/v3/cgroup2\.(Resources|Memory)'
- '^github\.com/tychoish/fun/pubsub\.BrokerOptions$'
- '^github\.com/neondatabase/autoscaling/pkg/util\.JSONPatch$'
- '^github\.com/neondatabase/autoscaling/pkg/util/watch\.HandlerFuncs$'
# vmapi.{VirtualMachine,VirtualMachineSpec,VirtualMachineMigration,VirtualMachineMigrationSpec}
- '^github\.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1\.VirtualMachine(Migration)?(Spec)?$'
- '^github\.com/neondatabase/autoscaling/pkg/agent/core\.Action$'

# see: <https://golangci-lint.run/usage/linters/#gci>
gci:
Expand Down
3 changes: 3 additions & 0 deletions deploy/agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ data:
"serverPort": 10301,
"retryServerMinWaitSeconds": 5,
"retryServerNormalWaitSeconds": 5,
"retryDeniedDownscaleSeconds": 5,
"retryFailedRequestSeconds": 3,
"registerRetrySeconds": 5,
"requestTimeoutSeconds": 1,
"registerTimeoutSeconds": 2,
Expand All @@ -31,6 +33,7 @@ data:
"scheduler": {
"schedulerName": "autoscale-scheduler",
"requestTimeoutSeconds": 2,
"requestAtLeastEverySeconds": 5,
"requestPort": 10299
},
"dumpState": {
Expand Down
48 changes: 31 additions & 17 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
)

type Config struct {
DumpState *DumpStateConfig `json:"dumpState"`
Scaling ScalingConfig `json:"scaling"`
Informant InformantConfig `json:"informant"`
Metrics MetricsConfig `json:"metrics"`
Scheduler SchedulerConfig `json:"scheduler"`
Billing *billing.Config `json:"billing,omitempty"`
DumpState *DumpStateConfig `json:"dumpState"`
}

// DumpStateConfig configures the endpoint to dump all internal state
Expand Down Expand Up @@ -54,6 +54,13 @@ type InformantConfig struct {
// register request.
RegisterRetrySeconds uint `json:"registerRetrySeconds"`

// RetryFailedRequestSeconds gives the duration, in seconds, that we must wait before retrying a
// request that previously failed.
RetryFailedRequestSeconds uint `json:"retryFailedRequestSeconds"`
// RetryDeniedDownscaleSeconds gives the duration, in seconds, that we must wait before retrying
// a downscale request that was previously denied
RetryDeniedDownscaleSeconds uint `json:"retryDeniedDownscaleSeconds"`

// RequestTimeoutSeconds gives the timeout for any individual request to the informant, except
// for those with separately-defined values below.
RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
Expand Down Expand Up @@ -98,6 +105,9 @@ type SchedulerConfig struct {
//
// If zero, requests will have no timeout.
RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"`
// RequestAtLeastEverySeconds gives the maximum duration we should go without attempting a
// request to the scheduler, even if nothing's changed.
RequestAtLeastEverySeconds uint `json:"requestAtLeastEverySeconds"`
// RequestPort defines the port to access the scheduler's ✨special✨ API with
RequestPort uint16 `json:"requestPort"`
}
Expand Down Expand Up @@ -131,31 +141,35 @@ func (c *Config) validate() error {
zeroTmpl = "field %q cannot be zero"
)

erc.Whenf(ec, c.Billing != nil && c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName")
erc.Whenf(ec, c.Billing != nil && c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName")
erc.Whenf(ec, c.Billing != nil && c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds")
erc.Whenf(ec, c.Billing != nil && c.Billing.PushEverySeconds == 0, zeroTmpl, ".billing.pushEverySeconds")
erc.Whenf(ec, c.Billing != nil && c.Billing.PushTimeoutSeconds == 0, zeroTmpl, ".billing.pushTimeoutSeconds")
erc.Whenf(ec, c.Billing != nil && c.Billing.URL == "", emptyTmpl, ".billing.url")
erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port")
erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds")
erc.Whenf(ec, c.Informant.DownscaleTimeoutSeconds == 0, zeroTmpl, ".informant.downscaleTimeoutSeconds")
erc.Whenf(ec, c.Informant.RegisterRetrySeconds == 0, zeroTmpl, ".informant.registerRetrySeconds")
erc.Whenf(ec, c.Informant.RegisterTimeoutSeconds == 0, zeroTmpl, ".informant.registerTimeoutSeconds")
erc.Whenf(ec, c.Informant.RequestTimeoutSeconds == 0, zeroTmpl, ".informant.requestTimeoutSeconds")
erc.Whenf(ec, c.Scaling.RequestTimeoutSeconds == 0, zeroTmpl, ".scaling.requestTimeoutSeconds")
// add all errors if there are any: https://github.com/neondatabase/autoscaling/pull/195#discussion_r1170893494
ec.Add(c.Scaling.DefaultConfig.Validate())
erc.Whenf(ec, c.Informant.ServerPort == 0, zeroTmpl, ".informant.serverPort")
erc.Whenf(ec, c.Informant.RetryServerMinWaitSeconds == 0, zeroTmpl, ".informant.retryServerMinWaitSeconds")
erc.Whenf(ec, c.Informant.RetryServerNormalWaitSeconds == 0, zeroTmpl, ".informant.retryServerNormalWaitSeconds")
erc.Whenf(ec, c.Informant.ServerPort == 0, zeroTmpl, ".informant.serverPort")
erc.Whenf(ec, c.Informant.RegisterRetrySeconds == 0, zeroTmpl, ".informant.registerRetrySeconds")
erc.Whenf(ec, c.Informant.RetryFailedRequestSeconds == 0, zeroTmpl, ".informant.retryFailedRequestSeconds")
erc.Whenf(ec, c.Informant.RetryDeniedDownscaleSeconds == 0, zeroTmpl, ".informant.retryDeniedDownscaleSeconds")
erc.Whenf(ec, c.Informant.RequestTimeoutSeconds == 0, zeroTmpl, ".informant.requestTimeoutSeconds")
erc.Whenf(ec, c.Informant.RegisterTimeoutSeconds == 0, zeroTmpl, ".informant.registerTimeoutSeconds")
erc.Whenf(ec, c.Informant.DownscaleTimeoutSeconds == 0, zeroTmpl, ".informant.downscaleTimeoutSeconds")
erc.Whenf(ec, c.Informant.UnhealthyAfterSilenceDurationSeconds == 0, zeroTmpl, ".informant.unhealthyAfterSilenceDurationSeconds")
erc.Whenf(ec, c.Informant.UnhealthyStartupGracePeriodSeconds == 0, zeroTmpl, ".informant.unhealthyStartupGracePeriodSeconds")
erc.Whenf(ec, c.Metrics.LoadMetricPrefix == "", emptyTmpl, ".metrics.loadMetricPrefix")
erc.Whenf(ec, c.Metrics.RequestTimeoutSeconds == 0, zeroTmpl, ".metrics.requestTimeoutSeconds")
erc.Whenf(ec, c.Metrics.SecondsBetweenRequests == 0, zeroTmpl, ".metrics.secondsBetweenRequests")
erc.Whenf(ec, c.Scaling.RequestTimeoutSeconds == 0, zeroTmpl, ".scaling.requestTimeoutSeconds")
// add all errors if there are any: https://github.com/neondatabase/autoscaling/pull/195#discussion_r1170893494
ec.Add(c.Scaling.DefaultConfig.Validate())
erc.Whenf(ec, c.Scheduler.RequestPort == 0, zeroTmpl, ".scheduler.requestPort")
erc.Whenf(ec, c.Scheduler.RequestTimeoutSeconds == 0, zeroTmpl, ".scheduler.requestTimeoutSeconds")
erc.Whenf(ec, c.Scheduler.SchedulerName == "", emptyTmpl, ".scheduler.schedulerName")
// note: c.Scheduler.RequestTimeoutSeconds == 0 is valid
erc.Whenf(ec, c.Scheduler.RequestAtLeastEverySeconds == 0, zeroTmpl, ".scheduler.requestAtLeastEverySeconds")
erc.Whenf(ec, c.Scheduler.RequestPort == 0, zeroTmpl, ".scheduler.requestPort")
erc.Whenf(ec, c.Billing != nil && c.Billing.URL == "", emptyTmpl, ".billing.url")
erc.Whenf(ec, c.Billing != nil && c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName")
erc.Whenf(ec, c.Billing != nil && c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName")
erc.Whenf(ec, c.Billing != nil && c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds")
erc.Whenf(ec, c.Billing != nil && c.Billing.PushEverySeconds == 0, zeroTmpl, ".billing.pushEverySeconds")
erc.Whenf(ec, c.Billing != nil && c.Billing.PushTimeoutSeconds == 0, zeroTmpl, ".billing.pushTimeoutSeconds")

return ec.Resolve()
}
40 changes: 40 additions & 0 deletions pkg/agent/core/action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package core

import (
"time"

"github.com/neondatabase/autoscaling/pkg/api"
)

type ActionSet struct {
Wait *ActionWait `json:"wait,omitempty"`
PluginRequest *ActionPluginRequest `json:"pluginRequest,omitempty"`
NeonVMRequest *ActionNeonVMRequest `json:"neonvmRequest,omitempty"`
InformantDownscale *ActionInformantDownscale `json:"informantDownscale,omitempty"`
InformantUpscale *ActionInformantUpscale `json:"informantUpscale,omitempty"`
}

type ActionWait struct {
Duration time.Duration `json:"duration"`
}

type ActionPluginRequest struct {
LastPermit *api.Resources `json:"current"`
Target api.Resources `json:"target"`
Metrics *api.Metrics `json:"metrics"`
}

type ActionNeonVMRequest struct {
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
}

type ActionInformantDownscale struct {
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
}

type ActionInformantUpscale struct {
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
}
142 changes: 142 additions & 0 deletions pkg/agent/core/dumpstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package core

// Implementation of (*UpdateState).Dump()

import (
"time"

"github.com/neondatabase/autoscaling/pkg/api"
)

func shallowCopy[T any](ptr *T) *T {
if ptr == nil {
return nil
} else {
x := *ptr
return &x
}
}

// StateDump provides introspection into the current values of the fields of State
type StateDump struct {
Config Config `json:"config"`
VM api.VmInfo `json:"vm"`
Plugin pluginStateDump `json:"plugin"`
Informant informantStateDump `json:"informant"`
NeonVM neonvmStateDump `json:"neonvm"`
Metrics *api.Metrics `json:"metrics"`
}

// Dump produces a JSON-serializable representation of the State
func (s *State) Dump() StateDump {
return StateDump{
Config: s.config,
VM: s.vm,
Plugin: s.plugin.dump(),
Informant: s.informant.dump(),
NeonVM: s.neonvm.dump(),
Metrics: shallowCopy(s.metrics),
}
}

type pluginStateDump struct {
Alive bool `json:"alive"`
OngoingRequest bool `json:"ongoingRequest"`
ComputeUnit *api.Resources `json:"computeUnit"`
LastRequest *pluginRequestedDump `json:"lastRequest"`
Permit *api.Resources `json:"permit"`
}
type pluginRequestedDump struct {
At time.Time `json:"time"`
Resources api.Resources `json:"resources"`
}

func (s *pluginState) dump() pluginStateDump {
var lastRequest *pluginRequestedDump
if s.lastRequest != nil {
lastRequest = &pluginRequestedDump{
At: s.lastRequest.at,
Resources: s.lastRequest.resources,
}
}

return pluginStateDump{
Alive: s.alive,
OngoingRequest: s.ongoingRequest,
ComputeUnit: shallowCopy(s.computeUnit),
LastRequest: lastRequest,
Permit: shallowCopy(s.permit),
}
}

type informantStateDump struct {
Active bool `json:"active"`
OngoingRequest *OngoingInformantRequestDump `json:"ongoingRequest"`
RequestedUpscale *requestedUpscaleDump `json:"requestedUpscale"`
DeniedDownscale *deniedDownscaleDump `json:"deniedDownscale"`
Approved *api.Resources `json:"approved"`
DownscaleFailureAt *time.Time `json:"downscaleFailureAt"`
UpscaleFailureAt *time.Time `json:"upscaleFailureAt"`
}
type OngoingInformantRequestDump struct {
Kind informantRequestKind `json:"kind"`
}
type requestedUpscaleDump struct {
At time.Time `json:"at"`
Base api.Resources `json:"base"`
Requested api.MoreResources `json:"requested"`
}
type deniedDownscaleDump struct {
At time.Time `json:"at"`
Requested api.Resources `json:"requested"`
}

func (s *informantState) dump() informantStateDump {
var requestedUpscale *requestedUpscaleDump
if s.requestedUpscale != nil {
requestedUpscale = &requestedUpscaleDump{
At: s.requestedUpscale.at,
Base: s.requestedUpscale.base,
Requested: s.requestedUpscale.requested,
}
}

var deniedDownscale *deniedDownscaleDump
if s.deniedDownscale != nil {
deniedDownscale = &deniedDownscaleDump{
At: s.deniedDownscale.at,
Requested: s.deniedDownscale.requested,
}
}

var ongoingRequest *OngoingInformantRequestDump
if s.ongoingRequest != nil {
ongoingRequest = &OngoingInformantRequestDump{
Kind: s.ongoingRequest.kind,
}
}

return informantStateDump{
Active: s.active,
OngoingRequest: ongoingRequest,
RequestedUpscale: requestedUpscale,
DeniedDownscale: deniedDownscale,
Approved: shallowCopy(s.approved),
DownscaleFailureAt: shallowCopy(s.downscaleFailureAt),
UpscaleFailureAt: shallowCopy(s.upscaleFailureAt),
}
}

type neonvmStateDump struct {
LastSuccess *api.Resources `json:"lastSuccess"`
OngoingRequested *api.Resources `json:"ongoingRequested"`
RequestFailedAt *time.Time `json:"requestFailedAt"`
}

func (s *neonvmState) dump() neonvmStateDump {
return neonvmStateDump{
LastSuccess: shallowCopy(s.lastSuccess),
OngoingRequested: shallowCopy(s.ongoingRequested),
RequestFailedAt: shallowCopy(s.requestFailedAt),
}
}
Loading

0 comments on commit 0faed63

Please sign in to comment.