diff --git a/neonvm/apis/neonvm/v1/virtualmachine_types.go b/neonvm/apis/neonvm/v1/virtualmachine_types.go index 01c56a935..d65a61ae9 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_types.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_types.go @@ -21,8 +21,10 @@ import ( "errors" "fmt" "slices" + "time" "github.com/samber/lo" + "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -139,6 +141,15 @@ type VirtualMachineSpec struct { // +kubebuilder:default:=true // +optional EnableSSH *bool `json:"enableSSH,omitempty"` + + // TargetRevision is the identifier set by external party to track when changes to the spec + // propagate to the VM. + // + // If a certain value is written into Spec.TargetRevision together with the changes, and + // the same value is observed in Status.CurrentRevision, it means that the changes were + // propagated to the VM. + // +optional + TargetRevision *RevisionWithTime `json:"targetRevision,omitempty"` } func (spec *VirtualMachineSpec) Resources() VirtualMachineResources { @@ -215,6 +226,66 @@ func (g Guest) ValidateForMemoryProvider(p MemoryProvider) error { return nil } +// Flag is a bitmask of flags. The meaning is up to the user. +// +// Used in Revision below. +type Flag uint64 + +func (f *Flag) Set(flag Flag) { + *f |= flag +} + +func (f *Flag) Clear(flag Flag) { + *f &= ^flag +} + +func (f *Flag) Has(flag Flag) bool { + return *f&flag != 0 +} + +// Revision is an identifier, which can be assigned to a specific configuration of a VM. +// Later it can be used to track the application of the configuration. +type Revision struct { + Value int64 `json:"value"` + Flags Flag `json:"flags"` +} + +// ZeroRevision is the default value when revisions updates are disabled. +var ZeroRevision = Revision{Value: 0, Flags: 0} + +func (r Revision) Min(other Revision) Revision { + if r.Value < other.Value { + return r + } + return other +} + +func (r Revision) WithTime(t time.Time) RevisionWithTime { + return RevisionWithTime{ + Revision: r, + UpdatedAt: metav1.NewTime(t), + } +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, so that Revision can be used with zap.Object +func (r *Revision) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddInt64("value", r.Value) + enc.AddUint64("flags", uint64(r.Flags)) + return nil +} + +// RevisionWithTime contains a Revision and the time it was last updated. +type RevisionWithTime struct { + Revision `json:"revision"` + UpdatedAt metav1.Time `json:"updatedAt"` +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, so that RevisionWithTime can be used with zap.Object +func (r *RevisionWithTime) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddTime("updatedAt", r.UpdatedAt.Time) + return r.Revision.MarshalLogObject(enc) +} + type GuestSettings struct { // Individual lines to add to a sysctl.conf file. See sysctl.conf(5) for more // +optional @@ -534,6 +605,11 @@ type VirtualMachineStatus struct { MemoryProvider *MemoryProvider `json:"memoryProvider,omitempty"` // +optional SSHSecretName string `json:"sshSecretName,omitempty"` + + // CurrentRevision is updated with Spec.TargetRevision's value once + // the changes are propagated to the VM. + // +optional + CurrentRevision *RevisionWithTime `json:"currentRevision,omitempty"` } type VmPhase string diff --git a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go index 46f3499e5..3821c56e3 100644 --- a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go +++ b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go @@ -405,6 +405,38 @@ func (in *Port) DeepCopy() *Port { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Revision) DeepCopyInto(out *Revision) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Revision. +func (in *Revision) DeepCopy() *Revision { + if in == nil { + return nil + } + out := new(Revision) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RevisionWithTime) DeepCopyInto(out *RevisionWithTime) { + *out = *in + out.Revision = in.Revision + in.UpdatedAt.DeepCopyInto(&out.UpdatedAt) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RevisionWithTime. +func (in *RevisionWithTime) DeepCopy() *RevisionWithTime { + if in == nil { + return nil + } + out := new(RevisionWithTime) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RootDisk) DeepCopyInto(out *RootDisk) { *out = *in @@ -723,6 +755,11 @@ func (in *VirtualMachineSpec) DeepCopyInto(out *VirtualMachineSpec) { *out = new(bool) **out = **in } + if in.TargetRevision != nil { + in, out := &in.TargetRevision, &out.TargetRevision + *out = new(RevisionWithTime) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualMachineSpec. @@ -760,6 +797,11 @@ func (in *VirtualMachineStatus) DeepCopyInto(out *VirtualMachineStatus) { *out = new(MemoryProvider) **out = **in } + if in.CurrentRevision != nil { + in, out := &in.CurrentRevision, &out.CurrentRevision + *out = new(RevisionWithTime) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualMachineStatus. diff --git a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml index 6a3edcabe..805da9c0f 100644 --- a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml +++ b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml @@ -2662,6 +2662,37 @@ spec: type: boolean serviceAccountName: type: string + targetRevision: + description: "TargetRevision is the identifier set by external party + to track when changes to the spec propagate to the VM. \n If a certain + value is written into Spec.TargetRevision together with the changes, + and the same value is observed in Status.CurrentRevision, it means + that the changes were propagated to the VM." + properties: + revision: + description: Revision is an identifier, which can be assigned + to a specific configuration of a VM. Later it can be used to + track the application of the configuration. + properties: + flags: + description: "Flag is a bitmask of flags. The meaning is up + to the user. \n Used in Revision below." + format: int64 + type: integer + value: + format: int64 + type: integer + required: + - flags + - value + type: object + updatedAt: + format: date-time + type: string + required: + - revision + - updatedAt + type: object terminationGracePeriodSeconds: default: 5 format: int64 @@ -2786,6 +2817,34 @@ spec: pattern: ^[0-9]+((\.[0-9]*)?|m) type: integer x-kubernetes-int-or-string: true + currentRevision: + description: CurrentRevision is updated with Spec.TargetRevision's + value once the changes are propagated to the VM. + properties: + revision: + description: Revision is an identifier, which can be assigned + to a specific configuration of a VM. Later it can be used to + track the application of the configuration. + properties: + flags: + description: "Flag is a bitmask of flags. The meaning is up + to the user. \n Used in Revision below." + format: int64 + type: integer + value: + format: int64 + type: integer + required: + - flags + - value + type: object + updatedAt: + format: date-time + type: string + required: + - revision + - updatedAt + type: object extraNetIP: type: string extraNetMask: diff --git a/neonvm/controllers/vm_controller.go b/neonvm/controllers/vm_controller.go index e1deb86ca..aa045798e 100644 --- a/neonvm/controllers/vm_controller.go +++ b/neonvm/controllers/vm_controller.go @@ -807,9 +807,27 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) // do nothing } + // Propagate TargetRevision to CurrentRevision. This is done only if the VM is fully + // reconciled and running. + if vm.Status.Phase == vmv1.VmRunning { + propagateRevision(vm) + } + return nil } +func propagateRevision(vm *vmv1.VirtualMachine) { + if vm.Spec.TargetRevision == nil { + return + } + if vm.Status.CurrentRevision != nil && + vm.Status.CurrentRevision.Revision == vm.Spec.TargetRevision.Revision { + return + } + rev := vm.Spec.TargetRevision.WithTime(time.Now()) + vm.Status.CurrentRevision = &rev +} + func pickMemoryProvider(config *ReconcilerConfig, vm *vmv1.VirtualMachine) vmv1.MemoryProvider { if p := vm.Spec.Guest.MemoryProvider; p != nil { return *p diff --git a/pkg/agent/core/action.go b/pkg/agent/core/action.go index 04e6c03e1..c7be28f31 100644 --- a/pkg/agent/core/action.go +++ b/pkg/agent/core/action.go @@ -5,6 +5,7 @@ import ( "go.uber.org/zap/zapcore" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/api" ) @@ -21,24 +22,28 @@ type ActionWait struct { } type ActionPluginRequest struct { - LastPermit *api.Resources `json:"current"` - Target api.Resources `json:"target"` - Metrics *api.Metrics `json:"metrics"` + LastPermit *api.Resources `json:"current"` + Target api.Resources `json:"target"` + Metrics *api.Metrics `json:"metrics"` + TargetRevision vmv1.RevisionWithTime `json:"targetRevision"` } type ActionNeonVMRequest struct { - Current api.Resources `json:"current"` - Target api.Resources `json:"target"` + Current api.Resources `json:"current"` + Target api.Resources `json:"target"` + TargetRevision vmv1.RevisionWithTime `json:"targetRevision"` } type ActionMonitorDownscale struct { - Current api.Resources `json:"current"` - Target api.Resources `json:"target"` + Current api.Resources `json:"current"` + Target api.Resources `json:"target"` + TargetRevision vmv1.RevisionWithTime `json:"targetRevision"` } type ActionMonitorUpscale struct { - Current api.Resources `json:"current"` - Target api.Resources `json:"target"` + Current api.Resources `json:"current"` + Target api.Resources `json:"target"` + TargetRevision vmv1.RevisionWithTime `json:"targetRevision"` } func addObjectPtr[T zapcore.ObjectMarshaler](enc zapcore.ObjectEncoder, key string, value *T) error { diff --git a/pkg/agent/core/dumpstate.go b/pkg/agent/core/dumpstate.go index 63a8d0ce3..3e4d9353d 100644 --- a/pkg/agent/core/dumpstate.go +++ b/pkg/agent/core/dumpstate.go @@ -33,23 +33,26 @@ func (d StateDump) MarshalJSON() ([]byte, error) { func (s *State) Dump() StateDump { return StateDump{ internal: state{ - Debug: s.internal.Debug, - Config: s.internal.Config, - VM: s.internal.VM, - Plugin: s.internal.Plugin.deepCopy(), - Monitor: s.internal.Monitor.deepCopy(), - NeonVM: s.internal.NeonVM.deepCopy(), - Metrics: shallowCopy[SystemMetrics](s.internal.Metrics), + Debug: s.internal.Debug, + Config: s.internal.Config, + VM: s.internal.VM, + Plugin: s.internal.Plugin.deepCopy(), + Monitor: s.internal.Monitor.deepCopy(), + NeonVM: s.internal.NeonVM.deepCopy(), + Metrics: shallowCopy[SystemMetrics](s.internal.Metrics), + TargetRevision: s.internal.TargetRevision, + LastDesiredResources: s.internal.LastDesiredResources, }, } } func (s *pluginState) deepCopy() pluginState { return pluginState{ - OngoingRequest: s.OngoingRequest, - LastRequest: shallowCopy[pluginRequested](s.LastRequest), - LastFailureAt: shallowCopy[time.Time](s.LastFailureAt), - Permit: shallowCopy[api.Resources](s.Permit), + OngoingRequest: s.OngoingRequest, + LastRequest: shallowCopy[pluginRequested](s.LastRequest), + LastFailureAt: shallowCopy[time.Time](s.LastFailureAt), + Permit: shallowCopy[api.Resources](s.Permit), + CurrentRevision: s.CurrentRevision, } } @@ -61,6 +64,7 @@ func (s *monitorState) deepCopy() monitorState { Approved: shallowCopy[api.Resources](s.Approved), DownscaleFailureAt: shallowCopy[time.Time](s.DownscaleFailureAt), UpscaleFailureAt: shallowCopy[time.Time](s.UpscaleFailureAt), + CurrentRevision: s.CurrentRevision, } } @@ -69,5 +73,7 @@ func (s *neonvmState) deepCopy() neonvmState { LastSuccess: shallowCopy[api.Resources](s.LastSuccess), OngoingRequested: shallowCopy[api.Resources](s.OngoingRequested), RequestFailedAt: shallowCopy[time.Time](s.RequestFailedAt), + TargetRevision: s.TargetRevision, + CurrentRevision: s.CurrentRevision, } } diff --git a/pkg/agent/core/revsource/revsource.go b/pkg/agent/core/revsource/revsource.go new file mode 100644 index 000000000..d28e5d0b8 --- /dev/null +++ b/pkg/agent/core/revsource/revsource.go @@ -0,0 +1,105 @@ +package revsource + +import ( + "errors" + "time" + + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" +) + +const ( + Upscale vmv1.Flag = 1 << iota + Downscale +) + +// MaxRevisions is the maximum number of revisions that can be stored in the RevisionSource. +// This is to prevent memory leaks. +// Upon reaching it, the oldest revisions are discarded. +const MaxRevisions = 100 + +// RevisionSource can generate and observe revisions. +// Each Revision is a value and a set of flags (for meta-information). +// Once RevisionSource observes a previously generated Revision after some time, +// the time it took since that Revision was generated. +type RevisionSource struct { + cb ObserveCallback + + // The in-flight revisions are stored in-order. + // After the revision is observed, it is removed from the measurements, and the offset is increased. + measurements []time.Time + offset int64 +} + +func NewRevisionSource(initialRevision int64, cb ObserveCallback) *RevisionSource { + return &RevisionSource{ + cb: cb, + measurements: nil, + offset: initialRevision + 1, // Will start from the next one + } +} + +func (c *RevisionSource) nextValue() int64 { + return c.offset + int64(len(c.measurements)) +} + +func (c *RevisionSource) Next(now time.Time, flags vmv1.Flag) vmv1.Revision { + ret := vmv1.Revision{ + Value: c.nextValue(), + Flags: flags, + } + c.measurements = append(c.measurements, now) + + if len(c.measurements) > MaxRevisions { + c.measurements = c.measurements[1:] + c.offset++ + } + + return ret +} + +func (c *RevisionSource) Observe(moment time.Time, rev vmv1.Revision) error { + if rev.Value < c.offset { + // Already observed + return nil + } + + idx := rev.Value - c.offset + if idx > int64(len(c.measurements)) { + return errors.New("revision is in the future") + } + + diff := moment.Sub(c.measurements[idx]) + + if c.cb != nil { + c.cb(diff, rev.Flags) + } + + // Forget the measurement, and all the measurements before it. + c.offset = rev.Value + 1 + c.measurements = c.measurements[idx+1:] + + return nil +} + +type ObserveCallback func(dur time.Duration, flags vmv1.Flag) + +// Propagate sets the target revision to be current, optionally measuring the time it took +// for propagation. +func Propagate( + now time.Time, + target vmv1.RevisionWithTime, + currentSlot *vmv1.Revision, + cb ObserveCallback, +) { + if currentSlot == nil { + return + } + if currentSlot.Value >= target.Value { + return + } + if cb != nil { + diff := now.Sub(target.UpdatedAt.Time) + cb(diff, target.Flags) + } + *currentSlot = target.Revision +} diff --git a/pkg/agent/core/revsource/revsource_test.go b/pkg/agent/core/revsource/revsource_test.go new file mode 100644 index 000000000..87fe801c9 --- /dev/null +++ b/pkg/agent/core/revsource/revsource_test.go @@ -0,0 +1,118 @@ +package revsource_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" +) + +type testRevisionSource struct { + *revsource.RevisionSource + t *testing.T + now v1.Time + result *time.Duration + resultFlags *vmv1.Flag +} + +func (trs *testRevisionSource) advance(d time.Duration) { + trs.now = v1.NewTime(trs.now.Add(d)) +} + +func (trs *testRevisionSource) assertResult(d time.Duration, flags vmv1.Flag) { + require.NotNil(trs.t, trs.result) + assert.Equal(trs.t, d, *trs.result) + require.NotNil(trs.t, trs.resultFlags) + assert.Equal(trs.t, flags, *trs.resultFlags) + trs.result = nil +} + +func newTestRevisionSource(t *testing.T) *testRevisionSource { + tcm := &testRevisionSource{ + RevisionSource: nil, + t: t, + now: v1.NewTime(time.Now()), + result: nil, + resultFlags: nil, + } + + cb := func(d time.Duration, flags vmv1.Flag) { + tcm.result = &d + tcm.resultFlags = &flags + } + tcm.RevisionSource = revsource.NewRevisionSource(0, cb) + + return tcm +} + +func TestRevSource(t *testing.T) { + trs := newTestRevisionSource(t) + + // Generate new revision + rev := trs.Next(trs.now.Time, revsource.Upscale) + assert.Equal(t, int64(1), rev.Value) + + // Observe it coming back in 5 seconds + trs.advance(5 * time.Second) + err := trs.Observe(trs.now.Time, rev) + assert.NoError(t, err) + trs.assertResult(5*time.Second, revsource.Upscale) +} + +func TestRevSourceSkip(t *testing.T) { + trs := newTestRevisionSource(t) + + // Generate new revision + rev1 := trs.Next(trs.now.Time, 0) + assert.Equal(t, int64(1), rev1.Value) + + // Generate another one + trs.advance(5 * time.Second) + rev2 := trs.Next(trs.now.Time, 0) + assert.Equal(t, int64(2), rev2.Value) + + // Observe the first one + trs.advance(5 * time.Second) + err := trs.Observe(trs.now.Time, rev1) + assert.NoError(t, err) + trs.assertResult(10*time.Second, 0) + + // Observe the second one + trs.advance(2 * time.Second) + err = trs.Observe(trs.now.Time, rev2) + assert.NoError(t, err) + trs.assertResult(7*time.Second, 0) +} + +func TestStale(t *testing.T) { + trs := newTestRevisionSource(t) + + // Generate new revision + cl := trs.Next(trs.now.Time, 0) + assert.Equal(t, int64(1), cl.Value) + + // Observe it coming back in 5 seconds + trs.advance(5 * time.Second) + err := trs.Observe(trs.now.Time, cl) + assert.NoError(t, err) + trs.assertResult(5*time.Second, 0) + + // Observe it coming back again + trs.advance(5 * time.Second) + err = trs.Observe(trs.now.Time, cl) + // No error, but no result either + assert.NoError(t, err) + assert.Nil(t, trs.result) +} + +func TestNonZeroRev(t *testing.T) { + revSource := revsource.NewRevisionSource(5, nil) + rev := revSource.Next(time.Now(), 0) + assert.Equal(t, int64(6), rev.Value) +} diff --git a/pkg/agent/core/state.go b/pkg/agent/core/state.go index 905493fb0..f199f8579 100644 --- a/pkg/agent/core/state.go +++ b/pkg/agent/core/state.go @@ -30,10 +30,23 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" ) +type ObservabilityCallbacks struct { + PluginLatency revsource.ObserveCallback + MonitorLatency revsource.ObserveCallback + NeonVMLatency revsource.ObserveCallback +} + +type RevisionSource interface { + Next(ts time.Time, flags vmv1.Flag) vmv1.Revision + Observe(moment time.Time, rev vmv1.Revision) error +} + // Config represents some of the static configuration underlying the decision-making of State type Config struct { // ComputeUnit is the desired ratio between CPU and memory, copied from the global @@ -72,6 +85,12 @@ type Config struct { // Log provides an outlet for (*State).NextActions() to give informative messages or warnings // about conditions that are impeding its ability to execute. Log LogConfig `json:"-"` + + // RevisionSource is the source of revisions to track the progress during scaling. + RevisionSource RevisionSource `json:"-"` + + // ObservabilityCallbacks are the callbacks to submit datapoints for observability. + ObservabilityCallbacks ObservabilityCallbacks `json:"-"` } type LogConfig struct { @@ -114,6 +133,12 @@ type state struct { NeonVM neonvmState Metrics *SystemMetrics + + // TargetRevision is the revision agent works towards. + TargetRevision vmv1.Revision + + // LastDesiredResources is the last target agent wanted to scale to. + LastDesiredResources *api.Resources } type pluginState struct { @@ -127,6 +152,9 @@ type pluginState struct { // Permit, if not nil, stores the Permit in the most recent PluginResponse. This field will be // nil if we have not been able to contact *any* scheduler. Permit *api.Resources + + // CurrentRevision is the most recent revision the plugin has acknowledged. + CurrentRevision vmv1.Revision } type pluginRequested struct { @@ -155,6 +183,9 @@ type monitorState struct { // UpscaleFailureAt, if not nil, stores the time at which an upscale request most recently // failed UpscaleFailureAt *time.Time + + // CurrentRevision is the most recent revision the monitor has acknowledged. + CurrentRevision vmv1.Revision } func (ms *monitorState) active() bool { @@ -190,6 +221,12 @@ type neonvmState struct { // OngoingRequested, if not nil, gives the resources requested OngoingRequested *api.Resources RequestFailedAt *time.Time + + // TargetRevision is the revision agent works towards. Contrary to monitor/plugin, we + // store it not only in action, but also here. This is needed, because for NeonVM propagation + // happens after the changes are actually applied, when the action object is long gone. + TargetRevision vmv1.RevisionWithTime + CurrentRevision vmv1.Revision } func (ns *neonvmState) ongoingRequest() bool { @@ -203,10 +240,11 @@ func NewState(vm api.VmInfo, config Config) *State { Debug: false, VM: vm, Plugin: pluginState{ - OngoingRequest: false, - LastRequest: nil, - LastFailureAt: nil, - Permit: nil, + OngoingRequest: false, + LastRequest: nil, + LastFailureAt: nil, + Permit: nil, + CurrentRevision: vmv1.ZeroRevision, }, Monitor: monitorState{ OngoingRequest: nil, @@ -215,13 +253,18 @@ func NewState(vm api.VmInfo, config Config) *State { Approved: nil, DownscaleFailureAt: nil, UpscaleFailureAt: nil, + CurrentRevision: vmv1.ZeroRevision, }, NeonVM: neonvmState{ LastSuccess: nil, OngoingRequested: nil, RequestFailedAt: nil, + TargetRevision: vmv1.ZeroRevision.WithTime(time.Time{}), + CurrentRevision: vmv1.ZeroRevision, }, - Metrics: nil, + Metrics: nil, + LastDesiredResources: nil, + TargetRevision: vmv1.ZeroRevision, }, } } @@ -409,6 +452,7 @@ func (s *state) calculatePluginAction( return nil } }(), + TargetRevision: s.TargetRevision.WithTime(now), }, nil } else { if wantToRequestNewResources && waitingOnRetryBackoff { @@ -430,6 +474,17 @@ func (s *state) calculateNeonVMAction( pluginRequested *api.Resources, pluginRequestedPhase string, ) (*ActionNeonVMRequest, *time.Duration) { + targetRevision := s.TargetRevision + if desiredResources.HasFieldLessThan(s.VM.Using()) && s.Monitor.CurrentRevision.Value > 0 { + // We are downscaling, so we needed a permit from the monitor + targetRevision = targetRevision.Min(s.Monitor.CurrentRevision) + } + + if desiredResources.HasFieldGreaterThan(s.VM.Using()) && s.Plugin.CurrentRevision.Value > 0 { + // We are upscaling, so we needed a permit from the plugin + targetRevision = targetRevision.Min(s.Plugin.CurrentRevision) + } + // clamp desiredResources to what we're allowed to make a request for desiredResources = s.clampResources( s.VM.Using(), // current: what we're using already @@ -456,9 +511,11 @@ func (s *state) calculateNeonVMAction( } } + s.NeonVM.TargetRevision = targetRevision.WithTime(now) return &ActionNeonVMRequest{ - Current: s.VM.Using(), - Target: desiredResources, + Current: s.VM.Using(), + Target: desiredResources, + TargetRevision: s.NeonVM.TargetRevision, }, nil } else { var reqs []string @@ -541,8 +598,9 @@ func (s *state) calculateMonitorUpscaleAction( // Otherwise, we can make the request: return &ActionMonitorUpscale{ - Current: *s.Monitor.Approved, - Target: requestResources, + Current: *s.Monitor.Approved, + Target: requestResources, + TargetRevision: s.TargetRevision.WithTime(now), }, nil } @@ -627,8 +685,9 @@ func (s *state) calculateMonitorDownscaleAction( // Nothing else to check, we're good to make the request return &ActionMonitorDownscale{ - Current: *s.Monitor.Approved, - Target: requestResources, + Current: *s.Monitor.Approved, + Target: requestResources, + TargetRevision: s.TargetRevision.WithTime(now), }, nil } @@ -791,12 +850,58 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( return nil } } + s.updateTargetRevision(now, result, s.VM.Using()) - s.info("Calculated desired resources", zap.Object("current", s.VM.Using()), zap.Object("target", result)) + // TODO: we are both saving the result into LastDesiredResources and returning it. This is + // redundant, and we should remove one of the two. + s.LastDesiredResources = &result + + s.info("Calculated desired resources", + zap.Object("current", s.VM.Using()), + zap.Object("target", result), + zap.Object("targetRevision", &s.TargetRevision)) return result, calculateWaitTime } +func (s *state) updateTargetRevision(now time.Time, desired api.Resources, current api.Resources) { + if s.LastDesiredResources == nil { + s.LastDesiredResources = ¤t + } + + if *s.LastDesiredResources == desired { + // Nothing changed, so no need to update the target revision + return + } + + var flags vmv1.Flag + + if desired.HasFieldGreaterThan(*s.LastDesiredResources) { + flags.Set(revsource.Upscale) + } + if desired.HasFieldLessThan(*s.LastDesiredResources) { + flags.Set(revsource.Downscale) + } + + s.TargetRevision = s.Config.RevisionSource.Next(now, flags) +} + +func (s *state) updateNeonVMCurrentRevision(currentRevision vmv1.RevisionWithTime) { + revsource.Propagate(currentRevision.UpdatedAt.Time, + s.NeonVM.TargetRevision, + &s.NeonVM.CurrentRevision, + s.Config.ObservabilityCallbacks.NeonVMLatency, + ) + err := s.Config.RevisionSource.Observe(currentRevision.UpdatedAt.Time, currentRevision.Revision) + if err != nil { + s.warnf("Failed to observe clock source: %v", err) + } + + // We also zero out LastDesiredResources, because we are now starting from + // a new current resources. + s.LastDesiredResources = nil +} + func (s *state) timeUntilRequestedUpscalingExpired(now time.Time) time.Duration { if s.Monitor.RequestedUpscale != nil { return s.Monitor.RequestedUpscale.At.Add(s.Config.MonitorRequestedUpscaleValidPeriod).Sub(now) @@ -932,6 +1037,9 @@ func (s *State) UpdatedVM(vm api.VmInfo) { // - https://github.com/neondatabase/autoscaling/issues/462 vm.SetUsing(s.internal.VM.Using()) s.internal.VM = vm + if vm.CurrentRevision != nil { + s.internal.updateNeonVMCurrentRevision(*vm.CurrentRevision) + } } func (s *State) UpdateSystemMetrics(metrics SystemMetrics) { @@ -964,8 +1072,13 @@ func (h PluginHandle) RequestFailed(now time.Time) { h.s.Plugin.LastFailureAt = &now } -func (h PluginHandle) RequestSuccessful(now time.Time, resp api.PluginResponse) (_err error) { +func (h PluginHandle) RequestSuccessful( + now time.Time, + targetRevision vmv1.RevisionWithTime, + resp api.PluginResponse, +) (_err error) { h.s.Plugin.OngoingRequest = false + defer func() { if _err != nil { h.s.Plugin.LastFailureAt = &now @@ -995,6 +1108,11 @@ func (h PluginHandle) RequestSuccessful(now time.Time, resp api.PluginResponse) // the process of moving the source of truth for ComputeUnit from the scheduler plugin to the // autoscaler-agent. h.s.Plugin.Permit = &resp.Permit + revsource.Propagate(now, + targetRevision, + &h.s.Plugin.CurrentRevision, + h.s.Config.ObservabilityCallbacks.PluginLatency, + ) return nil } @@ -1015,6 +1133,7 @@ func (h MonitorHandle) Reset() { Approved: nil, DownscaleFailureAt: nil, UpscaleFailureAt: nil, + CurrentRevision: vmv1.ZeroRevision, } } @@ -1061,19 +1180,29 @@ func (h MonitorHandle) StartingDownscaleRequest(now time.Time, resources api.Res h.s.Monitor.DownscaleFailureAt = nil } -func (h MonitorHandle) DownscaleRequestAllowed(now time.Time) { +func (h MonitorHandle) DownscaleRequestAllowed(now time.Time, rev vmv1.RevisionWithTime) { h.s.Monitor.Approved = &h.s.Monitor.OngoingRequest.Requested h.s.Monitor.OngoingRequest = nil + revsource.Propagate(now, + rev, + &h.s.Monitor.CurrentRevision, + h.s.Config.ObservabilityCallbacks.MonitorLatency, + ) } // Downscale request was successful but the monitor denied our request. -func (h MonitorHandle) DownscaleRequestDenied(now time.Time) { +func (h MonitorHandle) DownscaleRequestDenied(now time.Time, targetRevision vmv1.RevisionWithTime) { h.s.Monitor.DeniedDownscale = &deniedDownscale{ At: now, Current: *h.s.Monitor.Approved, Requested: h.s.Monitor.OngoingRequest.Requested, } h.s.Monitor.OngoingRequest = nil + revsource.Propagate(now, + targetRevision, + &h.s.Monitor.CurrentRevision, + h.s.Config.ObservabilityCallbacks.MonitorLatency, + ) } func (h MonitorHandle) DownscaleRequestFailed(now time.Time) { diff --git a/pkg/agent/core/state_test.go b/pkg/agent/core/state_test.go index 7684b1add..0b86192c5 100644 --- a/pkg/agent/core/state_test.go +++ b/pkg/agent/core/state_test.go @@ -6,10 +6,14 @@ import ( "time" "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/exp/slices" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/agent/core" + "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" helpers "github.com/neondatabase/autoscaling/pkg/agent/core/testhelpers" "github.com/neondatabase/autoscaling/pkg/api" ) @@ -104,6 +108,7 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { ScalingEnabled: true, ScalingConfig: nil, }, + CurrentRevision: nil, }, core.Config{ ComputeUnit: api.Resources{VCPU: 250, Mem: 1 * slotSize}, @@ -126,6 +131,12 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { warnings = append(warnings, msg) }, }, + RevisionSource: revsource.NewRevisionSource(0, nil), + ObservabilityCallbacks: core.ObservabilityCallbacks{ + PluginLatency: nil, + MonitorLatency: nil, + NeonVMLatency: nil, + }, }, ) @@ -137,7 +148,7 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { // set lastApproved by simulating a scheduler request/response state.Plugin().StartingRequest(now, c.schedulerApproved) - err := state.Plugin().RequestSuccessful(now, api.PluginResponse{ + err := state.Plugin().RequestSuccessful(now, vmv1.ZeroRevision.WithTime(now), api.PluginResponse{ Permit: c.schedulerApproved, Migrate: nil, }) @@ -151,7 +162,7 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { state.Monitor().Reset() state.Monitor().Active(true) state.Monitor().StartingDownscaleRequest(now, *c.deniedDownscale) - state.Monitor().DownscaleRequestDenied(now) + state.Monitor().DownscaleRequestDenied(now, vmv1.ZeroRevision.WithTime(now)) } actual, _ := state.DesiredResourcesFromMetricsOrRequestedUpscaling(now) @@ -194,6 +205,12 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{ Info: nil, Warn: nil, }, + RevisionSource: &helpers.NilRevisionSource{}, + ObservabilityCallbacks: core.ObservabilityCallbacks{ + PluginLatency: nil, + MonitorLatency: nil, + NeonVMLatency: nil, + }, }, } @@ -210,16 +227,18 @@ func doInitialPluginRequest( metrics *api.Metrics, resources api.Resources, ) { + rev := vmv1.ZeroRevision.WithTime(clock.Now()) a.Call(state.NextActions, clock.Now()).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: nil, - Target: resources, - Metrics: metrics, + LastPermit: nil, + Target: resources, + Metrics: metrics, + TargetRevision: rev, }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resources) clock.Inc(requestTime) - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), rev, api.PluginResponse{ Permit: resources, Migrate: nil, }) @@ -234,6 +253,33 @@ func duration(s string) time.Duration { return d } +type latencyObserver struct { + t *testing.T + observations []struct { + latency time.Duration + flags vmv1.Flag + } +} + +func (a *latencyObserver) observe(latency time.Duration, flags vmv1.Flag) { + a.observations = append(a.observations, struct { + latency time.Duration + flags vmv1.Flag + }{latency, flags}) +} + +func (a *latencyObserver) assert(latency time.Duration, flags vmv1.Flag) { + require.NotEmpty(a.t, a.observations) + assert.Equal(a.t, latency, a.observations[0].latency) + assert.Equal(a.t, flags, a.observations[0].flags) + a.observations = a.observations[1:] +} + +// assertEmpty should be called in defer +func (a *latencyObserver) assertEmpty() { + assert.Empty(a.t, a.observations) +} + // Thorough checks of a relatively simple flow - scaling from 1 CU to 2 CU and back down. func TestBasicScaleUpAndDownFlow(t *testing.T) { a := helpers.NewAssert(t) @@ -241,12 +287,18 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { clockTick := func() helpers.Elapsed { return clock.Inc(100 * time.Millisecond) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) resForCU := DefaultComputeUnit.Mul + latencyObserver := &latencyObserver{t: t, observations: nil} + defer latencyObserver.assertEmpty() state := helpers.CreateInitialState( DefaultInitialStateConfig, helpers.WithStoredWarnings(a.StoredWarnings()), helpers.WithTestingLogfWarnings(t), + helpers.WithConfigSetting(func(c *core.Config) { + c.RevisionSource = revsource.NewRevisionSource(0, latencyObserver.observe) + }), ) nextActions := func() core.ActionSet { return state.NextActions(clock.Now()) @@ -269,12 +321,17 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { Equals(resForCU(2)) // Now that the initial scheduler request is done, and we have metrics that indicate - // scale-up would be a good idea, we should be contacting the scheduler to get approval. + // scale-up would be a good idea. + expectedRevision.Value = 1 + expectedRevision.Flags = revsource.Upscale + + // We should be contacting the scheduler to get approval. a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(1)), - Target: resForCU(2), - Metrics: lo.ToPtr(lastMetrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(1)), + Target: resForCU(2), + Metrics: lo.ToPtr(lastMetrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) // start the request: @@ -282,7 +339,7 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { clockTick().AssertEquals(duration("0.3s")) // should have nothing more to do; waiting on plugin request to come back a.Call(nextActions).Equals(core.ActionSet{}) - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(2), Migrate: nil, }) @@ -294,8 +351,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { // the next scheduler request. Wait: &core.ActionWait{Duration: duration("4.9s")}, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) // start the request: @@ -305,14 +363,27 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.8s")}, }) + + // Until NeonVM is successful, we won't see any observations. + latencyObserver.assertEmpty() + + // Now NeonVM request is done. a.Do(state.NeonVM().RequestSuccessful, clock.Now()) + a.Do(state.UpdatedVM, helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentRevision(expectedRevision.WithTime()), + )) + + // And we see the latency. We started at 0.2s and finished at 0.4s + latencyObserver.assert(duration("0.2s"), revsource.Upscale) // NeonVM change is done, now we should finish by notifying the vm-monitor a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.8s")}, // same as previous, clock hasn't changed MonitorUpscale: &core.ActionMonitorUpscale{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) // start the request: @@ -334,6 +405,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { clockTick().AssertEquals(duration("0.6s")) + expectedRevision.Value += 1 + expectedRevision.Flags = revsource.Downscale + // Set metrics back so that desired resources should now be zero lastMetrics = core.SystemMetrics{ LoadAverage1Min: 0.0, @@ -348,8 +422,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.6s")}, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(2), - Target: resForCU(1), + Current: resForCU(2), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(1)) @@ -358,14 +433,15 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.5s")}, }) - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) // After getting approval from the vm-monitor, we make the request to NeonVM to carry it out a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.5s")}, // same as previous, clock hasn't changed NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(2), - Target: resForCU(1), + Current: resForCU(2), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(1)) @@ -376,20 +452,34 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { }) a.Do(state.NeonVM().RequestSuccessful, clock.Now()) + // Request to NeonVM is successful, but let's wait one more tick for + // NeonVM to pick up the changes and apply those. + clockTick().AssertEquals(duration("0.9s")) + + // This means that the NeonVM has applied the changes. + a.Do(state.UpdatedVM, helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentRevision(expectedRevision.WithTime()), + )) + + // We started at 0.6s and finished at 0.9s. + latencyObserver.assert(duration("0.3s"), revsource.Downscale) + // Request to NeonVM completed, it's time to inform the scheduler plugin: a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(2)), - Target: resForCU(1), - Metrics: lo.ToPtr(lastMetrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(2)), + Target: resForCU(1), + Metrics: lo.ToPtr(lastMetrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, // shouldn't have anything to say to the other components }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(1)) - clockTick().AssertEquals(duration("0.9s")) + clockTick().AssertEquals(duration("1s")) // should have nothing more to do; waiting on plugin request to come back a.Call(nextActions).Equals(core.ActionSet{}) - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(1), Migrate: nil, }) @@ -404,10 +494,19 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { func TestPeriodicPluginRequest(t *testing.T) { a := helpers.NewAssert(t) clock := helpers.NewFakeClock(t) + expectedRevision := helpers.NewExpectedRevision(clock.Now) + + latencyObserver := &latencyObserver{t: t, observations: nil} + defer latencyObserver.assertEmpty() state := helpers.CreateInitialState( DefaultInitialStateConfig, helpers.WithStoredWarnings(a.StoredWarnings()), + helpers.WithConfigSetting(func(c *core.Config) { + // This time, we will test plugin latency + c.ObservabilityCallbacks.PluginLatency = latencyObserver.observe + c.RevisionSource = revsource.NewRevisionSource(0, nil) + }), ) state.Monitor().Active(true) @@ -440,18 +539,20 @@ func TestPeriodicPluginRequest(t *testing.T) { }) clock.Inc(clockTick) } else { + target := expectedRevision.WithTime() a.Call(state.NextActions, clock.Now()).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: &resources, - Target: resources, - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: &resources, + Target: resources, + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: target, }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resources) a.Call(state.NextActions, clock.Now()).Equals(core.ActionSet{}) clock.Inc(reqDuration) a.Call(state.NextActions, clock.Now()).Equals(core.ActionSet{}) - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), target, api.PluginResponse{ Permit: resources, Migrate: nil, }) @@ -460,6 +561,152 @@ func TestPeriodicPluginRequest(t *testing.T) { } } +// In this test agent wants to upscale from 1 CU to 4 CU, but the plugin only allows 3 CU. +// Agent upscales to 3 CU, then tries to upscale to 4 CU again. +func TestPartialUpscaleThenFull(t *testing.T) { + a := helpers.NewAssert(t) + clock := helpers.NewFakeClock(t) + clockTickDuration := duration("0.1s") + clockTick := func() { + clock.Inc(clockTickDuration) + } + expectedRevision := helpers.NewExpectedRevision(clock.Now) + scalingLatencyObserver := &latencyObserver{t: t, observations: nil} + defer scalingLatencyObserver.assertEmpty() + + pluginLatencyObserver := &latencyObserver{t: t, observations: nil} + defer pluginLatencyObserver.assertEmpty() + + resForCU := DefaultComputeUnit.Mul + + state := helpers.CreateInitialState( + DefaultInitialStateConfig, + helpers.WithStoredWarnings(a.StoredWarnings()), + helpers.WithMinMaxCU(1, 4), + helpers.WithCurrentCU(1), + helpers.WithConfigSetting(func(c *core.Config) { + c.RevisionSource = revsource.NewRevisionSource(0, scalingLatencyObserver.observe) + c.ObservabilityCallbacks.PluginLatency = pluginLatencyObserver.observe + }), + ) + + nextActions := func() core.ActionSet { + return state.NextActions(clock.Now()) + } + + state.Monitor().Active(true) + + doInitialPluginRequest(a, state, clock, duration("0.1s"), nil, resForCU(1)) + + // Set metrics + clockTick() + metrics := core.SystemMetrics{ + LoadAverage1Min: 1.0, + MemoryUsageBytes: 12345678, + } + a.Do(state.UpdateSystemMetrics, metrics) + + // double-check that we agree about the desired resources + a.Call(getDesiredResources, state, clock.Now()). + Equals(resForCU(4)) + + // Upscaling to 4 CU + expectedRevision.Value = 1 + expectedRevision.Flags = revsource.Upscale + targetRevision := expectedRevision.WithTime() + a.Call(nextActions).Equals(core.ActionSet{ + PluginRequest: &core.ActionPluginRequest{ + LastPermit: lo.ToPtr(resForCU(1)), + Target: resForCU(4), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: targetRevision, + }, + }) + + a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(4)) + clockTick() + a.Call(nextActions).Equals(core.ActionSet{}) + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), targetRevision, api.PluginResponse{ + Permit: resForCU(3), + Migrate: nil, + }) + + pluginLatencyObserver.assert(duration("0.1s"), revsource.Upscale) + + // NeonVM request + a. + WithWarnings("Wanted to make a request to the scheduler plugin, but previous request for more resources was denied too recently"). + Call(nextActions). + Equals(core.ActionSet{ + Wait: &core.ActionWait{Duration: duration("1.9s")}, + NeonVMRequest: &core.ActionNeonVMRequest{ + Current: resForCU(1), + Target: resForCU(3), + TargetRevision: expectedRevision.WithTime(), + }, + }) + + a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(3)) + clockTick() + a.Do(state.NeonVM().RequestSuccessful, clock.Now()) + clockTick() + a.Do(state.UpdatedVM, helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentCU(3), + helpers.WithCurrentRevision(expectedRevision.WithTime()), + )) + scalingLatencyObserver.assert(duration("0.3s"), revsource.Upscale) + + clock.Inc(duration("2s")) + + // Upscaling to 4 CU + expectedRevision.Value += 1 + targetRevision = expectedRevision.WithTime() + a.Call(nextActions).Equals(core.ActionSet{ + MonitorUpscale: &core.ActionMonitorUpscale{ + Current: resForCU(1), + Target: resForCU(3), + TargetRevision: expectedRevision.WithTime(), + }, + PluginRequest: &core.ActionPluginRequest{ + LastPermit: lo.ToPtr(resForCU(3)), + Target: resForCU(4), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), + }, + }) + + a.Do(state.Monitor().StartingUpscaleRequest, clock.Now(), resForCU(3)) + a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(4)) + clockTick() + a.Do(state.Monitor().UpscaleRequestSuccessful, clock.Now()) + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), targetRevision, api.PluginResponse{ + Permit: resForCU(4), + Migrate: nil, + }) + pluginLatencyObserver.assert(duration("0.1s"), revsource.Upscale) + a.Call(nextActions).Equals(core.ActionSet{ + Wait: &core.ActionWait{Duration: duration("4.9s")}, + NeonVMRequest: &core.ActionNeonVMRequest{ + Current: resForCU(3), + Target: resForCU(4), + TargetRevision: expectedRevision.WithTime(), + }, + }) + a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(4)) + clockTick() + a.Do(state.NeonVM().RequestSuccessful, clock.Now()) + vmInfo := helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentCU(4), + helpers.WithCurrentRevision(expectedRevision.WithTime()), + ) + clockTick() + a.Do(state.UpdatedVM, vmInfo) + + scalingLatencyObserver.assert(duration("0.2s"), revsource.Upscale) +} + // Checks that when downscaling is denied, we both (a) try again with higher resources, or (b) wait // to retry if there aren't higher resources to try with. func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { @@ -469,6 +716,9 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { clockTick := func() { clock.Inc(clockTickDuration) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) + latencyObserver := &latencyObserver{t: t, observations: nil} + defer latencyObserver.assertEmpty() resForCU := DefaultComputeUnit.Mul state := helpers.CreateInitialState( @@ -526,13 +776,14 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("6.8s")}, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(6), - Target: resForCU(5), + Current: resForCU(6), + Target: resForCU(5), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(5)) clockTick() - a.Do(state.Monitor().DownscaleRequestDenied, clock.Now()) + a.Do(state.Monitor().DownscaleRequestDenied, clock.Now(), expectedRevision.WithTime()) // At the end, we should be waiting to retry downscaling: a.Call(nextActions).Equals(core.ActionSet{ @@ -548,16 +799,18 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { var expectedNeonVMRequest *core.ActionNeonVMRequest if cu < 5 { expectedNeonVMRequest = &core.ActionNeonVMRequest{ - Current: resForCU(6), - Target: resForCU(cu + 1), + Current: resForCU(6), + Target: resForCU(cu + 1), + TargetRevision: expectedRevision.WithTime(), } } a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: currentPluginWait}, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(cu + 1), - Target: resForCU(cu), + Current: resForCU(cu + 1), + Target: resForCU(cu), + TargetRevision: expectedRevision.WithTime(), }, NeonVMRequest: expectedNeonVMRequest, }) @@ -569,9 +822,9 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { clockTick() currentPluginWait -= clockTickDuration if cu >= 3 /* allow down to 3 */ { - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) } else { - a.Do(state.Monitor().DownscaleRequestDenied, clock.Now()) + a.Do(state.Monitor().DownscaleRequestDenied, clock.Now(), expectedRevision.WithTime()) } } // At this point, waiting 3.7s for next attempt to downscale below 3 CU (last request was @@ -580,8 +833,9 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("2.3s")}, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(6), - Target: resForCU(3), + Current: resForCU(6), + Target: resForCU(3), + TargetRevision: expectedRevision.WithTime(), }, }) // Make the request: @@ -596,9 +850,10 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("3.9s")}, PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(6)), - Target: resForCU(3), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(6)), + Target: resForCU(3), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(3)) @@ -606,7 +861,7 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { Wait: &core.ActionWait{Duration: duration("3.9s")}, }) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(3), Migrate: nil, }) @@ -622,13 +877,14 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("3.1s")}, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(3), - Target: resForCU(2), + Current: resForCU(3), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(2)) clockTick() - a.Do(state.Monitor().DownscaleRequestDenied, clock.Now()) + a.Do(state.Monitor().DownscaleRequestDenied, clock.Now(), expectedRevision.WithTime()) // At the end, we should be waiting to retry downscaling (but actually, the regular plugin // request is coming up sooner). a.Call(nextActions).Equals(core.ActionSet{ @@ -639,9 +895,10 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("1s")}, // still want to retry vm-monitor downscaling PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(3)), - Target: resForCU(3), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(3)), + Target: resForCU(3), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(3)) @@ -649,7 +906,7 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { Wait: &core.ActionWait{Duration: duration("1s")}, // still waiting on retrying vm-monitor downscaling }) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(3), Migrate: nil, }) @@ -665,16 +922,18 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { var expectedNeonVMRequest *core.ActionNeonVMRequest if cu < 2 { expectedNeonVMRequest = &core.ActionNeonVMRequest{ - Current: resForCU(3), - Target: resForCU(cu + 1), + Current: resForCU(3), + Target: resForCU(cu + 1), + TargetRevision: expectedRevision.WithTime(), } } a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: currentPluginWait}, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(cu + 1), - Target: resForCU(cu), + Current: resForCU(cu + 1), + Target: resForCU(cu), + TargetRevision: expectedRevision.WithTime(), }, NeonVMRequest: expectedNeonVMRequest, }) @@ -685,15 +944,16 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { }) clockTick() currentPluginWait -= clockTickDuration - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) } // Still waiting on plugin request tick, but we can make a NeonVM request to enact the // downscaling right away ! a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("5.8s")}, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(3), - Target: resForCU(1), + Current: resForCU(3), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, time.Now(), resForCU(1)) @@ -705,9 +965,10 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { // Successfully downscaled, so now we should inform the plugin. Not waiting on any retries. a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(3)), - Target: resForCU(1), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(3)), + Target: resForCU(1), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(1)) @@ -715,7 +976,7 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { // not waiting on anything! }) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(1), Migrate: nil, }) @@ -733,12 +994,16 @@ func TestRequestedUpscale(t *testing.T) { clockTick := func() { clock.Inc(100 * time.Millisecond) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) resForCU := DefaultComputeUnit.Mul + latencyObserver := &latencyObserver{t: t, observations: nil} + defer latencyObserver.assertEmpty() state := helpers.CreateInitialState( DefaultInitialStateConfig, helpers.WithStoredWarnings(a.StoredWarnings()), helpers.WithConfigSetting(func(c *core.Config) { + c.RevisionSource = revsource.NewRevisionSource(0, latencyObserver.observe) c.MonitorRequestedUpscaleValidPeriod = duration("6s") // Override this for consistency }), ) @@ -766,13 +1031,18 @@ func TestRequestedUpscale(t *testing.T) { // Have the vm-monitor request upscaling: a.Do(state.Monitor().UpscaleRequested, clock.Now(), api.MoreResources{Cpu: false, Memory: true}) + // Revision advances + expectedRevision.Value = 1 + expectedRevision.Flags = revsource.Upscale + // First need to check with the scheduler plugin to get approval for upscaling: a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("6s")}, // if nothing else happens, requested upscale expires. PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(1)), - Target: resForCU(2), - Metrics: lo.ToPtr(lastMetrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(1)), + Target: resForCU(2), + Metrics: lo.ToPtr(lastMetrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2)) @@ -780,7 +1050,7 @@ func TestRequestedUpscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("5.9s")}, // same waiting for requested upscale expiring }) - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(2), Migrate: nil, }) @@ -789,20 +1059,30 @@ func TestRequestedUpscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.9s")}, // plugin tick wait is earlier than requested upscale expiration NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(2)) clockTick() a.Do(state.NeonVM().RequestSuccessful, clock.Now()) + // Update the VM to set current=1 + a.Do(state.UpdatedVM, helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentCU(2), + helpers.WithCurrentRevision(expectedRevision.WithTime()), + )) + latencyObserver.assert(duration("0.2s"), revsource.Upscale) + // Finally, tell the vm-monitor that it got upscaled: a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.8s")}, // still waiting on plugin tick MonitorUpscale: &core.ActionMonitorUpscale{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingUpscaleRequest, clock.Now(), resForCU(2)) @@ -821,9 +1101,10 @@ func TestRequestedUpscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("1s")}, PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(2)), - Target: resForCU(2), - Metrics: lo.ToPtr(lastMetrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(2)), + Target: resForCU(2), + Metrics: lo.ToPtr(lastMetrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2)) @@ -831,7 +1112,7 @@ func TestRequestedUpscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("0.9s")}, // waiting for requested upscale expiring }) - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(2), Migrate: nil, }) @@ -841,11 +1122,15 @@ func TestRequestedUpscale(t *testing.T) { Wait: &core.ActionWait{Duration: duration("0.9s")}, }) clock.Inc(duration("0.9s")) + // Upscale expired, revision advances + expectedRevision.Value = 2 + expectedRevision.Flags = revsource.Downscale a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4s")}, // now, waiting on plugin request tick MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(2), - Target: resForCU(1), + Current: resForCU(2), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) } @@ -858,7 +1143,6 @@ func TestRequestedUpscale(t *testing.T) { func TestDownscalePivotBack(t *testing.T) { a := helpers.NewAssert(t) var clock *helpers.FakeClock - clockTickDuration := duration("0.1s") clockTick := func() helpers.Elapsed { return clock.Inc(clockTickDuration) @@ -866,6 +1150,9 @@ func TestDownscalePivotBack(t *testing.T) { halfClockTick := func() helpers.Elapsed { return clock.Inc(clockTickDuration / 2) } + var expectedRevision *helpers.ExpectedRevision + latencyObserver := &latencyObserver{t: t, observations: nil} + defer latencyObserver.assertEmpty() resForCU := DefaultComputeUnit.Mul var state *core.State @@ -895,6 +1182,8 @@ func TestDownscalePivotBack(t *testing.T) { MonitorDownscale: &core.ActionMonitorDownscale{ Current: resForCU(2), Target: resForCU(1), + + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(1)) @@ -903,15 +1192,17 @@ func TestDownscalePivotBack(t *testing.T) { halfClockTick() *pluginWait -= clockTickDuration t.Log(" > finish vm-monitor downscale") - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) }, post: func(pluginWait *time.Duration) { + expectedRevision.Value = 2 t.Log(" > start vm-monitor upscale") a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: *pluginWait}, MonitorUpscale: &core.ActionMonitorUpscale{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingUpscaleRequest, clock.Now(), resForCU(2)) @@ -928,8 +1219,9 @@ func TestDownscalePivotBack(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: *pluginWait}, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(2), - Target: resForCU(1), + Current: resForCU(2), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(1)) @@ -945,8 +1237,9 @@ func TestDownscalePivotBack(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: *pluginWait}, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(2)) @@ -956,15 +1249,31 @@ func TestDownscalePivotBack(t *testing.T) { a.Do(state.NeonVM().RequestSuccessful, clock.Now()) }, }, + // NeonVM propagation + { + pre: func(_ *time.Duration, midAction func()) { + a.Do(state.UpdatedVM, helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentCU(1), + helpers.WithCurrentRevision(expectedRevision.WithTime()), + )) + latencyObserver.assert(duration("0.2s"), revsource.Downscale) + midAction() + }, + post: func(_ *time.Duration) { + // No action + }, + }, // plugin requests { pre: func(pluginWait *time.Duration, midRequest func()) { t.Log(" > start plugin downscale") a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(2)), - Target: resForCU(1), - Metrics: lo.ToPtr(initialMetrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(2)), + Target: resForCU(1), + Metrics: lo.ToPtr(initialMetrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(1)) @@ -973,7 +1282,7 @@ func TestDownscalePivotBack(t *testing.T) { halfClockTick() *pluginWait = duration("4.9s") // reset because we just made a request t.Log(" > finish plugin downscale") - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(1), Migrate: nil, }) @@ -982,16 +1291,17 @@ func TestDownscalePivotBack(t *testing.T) { t.Log(" > start plugin upscale") a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(1)), - Target: resForCU(2), - Metrics: lo.ToPtr(newMetrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(1)), + Target: resForCU(2), + Metrics: lo.ToPtr(newMetrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2)) clockTick() *pluginWait = duration("4.9s") // reset because we just made a request t.Log(" > finish plugin upscale") - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(2), Migrate: nil, }) @@ -1004,11 +1314,15 @@ func TestDownscalePivotBack(t *testing.T) { // Initial setup clock = helpers.NewFakeClock(t) + expectedRevision = helpers.NewExpectedRevision(clock.Now) state = helpers.CreateInitialState( DefaultInitialStateConfig, helpers.WithStoredWarnings(a.StoredWarnings()), helpers.WithMinMaxCU(1, 3), helpers.WithCurrentCU(2), + helpers.WithConfigSetting(func(c *core.Config) { + c.RevisionSource = revsource.NewRevisionSource(0, latencyObserver.observe) + }), ) state.Monitor().Active(true) @@ -1023,6 +1337,10 @@ func TestDownscalePivotBack(t *testing.T) { a.Call(getDesiredResources, state, clock.Now()). Equals(resForCU(1)) + // We start with downscale + expectedRevision.Value = 1 + expectedRevision.Flags = revsource.Downscale + for j := 0; j <= i; j++ { midRequest := func() {} if j == i { @@ -1032,6 +1350,7 @@ func TestDownscalePivotBack(t *testing.T) { a.Do(state.UpdateSystemMetrics, newMetrics) a.Call(getDesiredResources, state, clock.Now()). Equals(resForCU(2)) + } } @@ -1039,6 +1358,10 @@ func TestDownscalePivotBack(t *testing.T) { } for j := i; j >= 0; j-- { + // Now it is upscale + expectedRevision.Value = 2 + expectedRevision.Flags = revsource.Upscale + steps[j].post(&pluginWait) } } @@ -1052,6 +1375,9 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) { clockTick := func() { clock.Inc(100 * time.Millisecond) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) + latencyObserver := &latencyObserver{t: t, observations: nil} + defer latencyObserver.assertEmpty() resForCU := DefaultComputeUnit.Mul state := helpers.CreateInitialState( @@ -1059,6 +1385,9 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) { helpers.WithStoredWarnings(a.StoredWarnings()), helpers.WithMinMaxCU(1, 3), helpers.WithCurrentCU(2), + helpers.WithConfigSetting(func(config *core.Config) { + config.RevisionSource = revsource.NewRevisionSource(0, latencyObserver.observe) + }), ) nextActions := func() core.ActionSet { return state.NextActions(clock.Now()) @@ -1095,6 +1424,8 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) { )) // We should be making a vm-monitor downscaling request + expectedRevision.Value += 1 + expectedRevision.Flags = revsource.Downscale // TODO: In the future, we should have a "force-downscale" alternative so the vm-monitor doesn't // get to deny the downscaling. a.Call(nextActions).Equals(core.ActionSet{ @@ -1102,17 +1433,21 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) { MonitorDownscale: &core.ActionMonitorDownscale{ Current: resForCU(2), Target: resForCU(1), + + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(1)) clockTick() - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) // Do NeonVM request for that downscaling a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.6s")}, NeonVMRequest: &core.ActionNeonVMRequest{ Current: resForCU(2), Target: resForCU(1), + + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(1)) @@ -1121,20 +1456,33 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) { // Do plugin request for that downscaling: a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(2)), - Target: resForCU(1), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(2)), + Target: resForCU(1), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(1)) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(1), Migrate: nil, }) + + // Update the VM to set currentCU==1 CU + clockTick() + a.Do(state.UpdatedVM, helpers.CreateVmInfo( + DefaultInitialStateConfig.VM, + helpers.WithCurrentCU(1), + helpers.WithMinMaxCU(1, 1), + helpers.WithCurrentRevision(expectedRevision.WithTime()), + )) + + latencyObserver.assert(duration("0.4s"), revsource.Downscale) + // And then, we shouldn't need to do anything else: a.Call(nextActions).Equals(core.ActionSet{ - Wait: &core.ActionWait{Duration: duration("4.9s")}, + Wait: &core.ActionWait{Duration: duration("4.8s")}, }) } @@ -1146,6 +1494,7 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) { clockTick := func() { clock.Inc(100 * time.Millisecond) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) resForCU := DefaultComputeUnit.Mul state := helpers.CreateInitialState( @@ -1191,14 +1540,15 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) { // We should be making a plugin request to get upscaling: a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(2)), - Target: resForCU(3), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(2)), + Target: resForCU(3), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(3)) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(3), Migrate: nil, }) @@ -1206,8 +1556,9 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.9s")}, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(2), - Target: resForCU(3), + Current: resForCU(2), + Target: resForCU(3), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(3)) @@ -1217,8 +1568,9 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.8s")}, MonitorUpscale: &core.ActionMonitorUpscale{ - Current: resForCU(2), - Target: resForCU(3), + Current: resForCU(2), + Target: resForCU(3), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingUpscaleRequest, clock.Now(), resForCU(3)) @@ -1237,6 +1589,7 @@ func TestFailedRequestRetry(t *testing.T) { clockTick := func() { clock.Inc(100 * time.Millisecond) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) resForCU := DefaultComputeUnit.Mul state := helpers.CreateInitialState( @@ -1270,9 +1623,10 @@ func TestFailedRequestRetry(t *testing.T) { // We should be asking the scheduler for upscaling a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(1)), - Target: resForCU(2), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(1)), + Target: resForCU(2), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2)) @@ -1289,14 +1643,15 @@ func TestFailedRequestRetry(t *testing.T) { // ... and then retry: a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(1)), - Target: resForCU(2), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(1)), + Target: resForCU(2), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2)) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(2), Migrate: nil, }) @@ -1306,8 +1661,9 @@ func TestFailedRequestRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.9s")}, // plugin request tick NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(2)) @@ -1324,8 +1680,9 @@ func TestFailedRequestRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("1.8s")}, // plugin request tick NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(2)) @@ -1336,8 +1693,9 @@ func TestFailedRequestRetry(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("1.7s")}, // plugin request tick MonitorUpscale: &core.ActionMonitorUpscale{ - Current: resForCU(1), - Target: resForCU(2), + Current: resForCU(1), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) } @@ -1352,6 +1710,7 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { clockTick := func() { clock.Inc(100 * time.Millisecond) } + expectedRevision := helpers.NewExpectedRevision(clock.Now) resForCU := DefaultComputeUnit.Mul state := helpers.CreateInitialState( @@ -1373,14 +1732,15 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { Call(state.NextActions, clock.Now()). Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: nil, - Target: resForCU(3), - Metrics: nil, + LastPermit: nil, + Target: resForCU(3), + Metrics: nil, + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(3)) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(3), Migrate: nil, }) @@ -1393,8 +1753,9 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.8s")}, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(3), - Target: resForCU(2), + Current: resForCU(3), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(2)) @@ -1419,16 +1780,18 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { // When the vm-monitor request finishes, we want to both // (a) request additional downscaling from vm-monitor, and // (b) make a NeonVM request for the initially approved downscaling - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) a.Call(nextActions).Equals(core.ActionSet{ Wait: &core.ActionWait{Duration: duration("4.6s")}, // plugin request tick wait NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(3), - Target: resForCU(2), + Current: resForCU(3), + Target: resForCU(2), + TargetRevision: expectedRevision.WithTime(), }, MonitorDownscale: &core.ActionMonitorDownscale{ - Current: resForCU(2), - Target: resForCU(1), + Current: resForCU(2), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) // Start both requests. The vm-monitor request will finish first, but after that we'll just be @@ -1438,7 +1801,7 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { clockTick() - a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now()) + a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now(), expectedRevision.WithTime()) a. WithWarnings( "Wanted to make a request to NeonVM API, but there's already NeonVM request (for different resources) ongoing", @@ -1459,13 +1822,15 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { // incorrectly for 1 CU, rather than 2 CU. So, the rest of this test case is mostly just // rounding out the rest of the scale-down routine. PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(3)), - Target: resForCU(2), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(3)), + Target: resForCU(2), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, NeonVMRequest: &core.ActionNeonVMRequest{ - Current: resForCU(2), - Target: resForCU(1), + Current: resForCU(2), + Target: resForCU(1), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2)) @@ -1473,7 +1838,7 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(2), Migrate: nil, }) @@ -1489,16 +1854,17 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { a.Do(state.NeonVM().RequestSuccessful, clock.Now()) a.Call(nextActions).Equals(core.ActionSet{ PluginRequest: &core.ActionPluginRequest{ - LastPermit: lo.ToPtr(resForCU(2)), - Target: resForCU(1), - Metrics: lo.ToPtr(metrics.ToAPI()), + LastPermit: lo.ToPtr(resForCU(2)), + Target: resForCU(1), + Metrics: lo.ToPtr(metrics.ToAPI()), + TargetRevision: expectedRevision.WithTime(), }, }) a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(1)) clockTick() - a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{ + a.NoError(state.Plugin().RequestSuccessful, clock.Now(), expectedRevision.WithTime(), api.PluginResponse{ Permit: resForCU(1), Migrate: nil, }) diff --git a/pkg/agent/core/testhelpers/construct.go b/pkg/agent/core/testhelpers/construct.go index 4aced58a5..a97e8a10b 100644 --- a/pkg/agent/core/testhelpers/construct.go +++ b/pkg/agent/core/testhelpers/construct.go @@ -86,6 +86,7 @@ func CreateVmInfo(config InitialVmInfoConfig, opts ...VmInfoOpt) api.VmInfo { ScalingConfig: nil, ScalingEnabled: true, }, + CurrentRevision: nil, } for _, o := range opts { @@ -158,3 +159,9 @@ func WithCurrentCU(cu uint16) VmInfoOpt { vm.SetUsing(c.ComputeUnit.Mul(cu)) }) } + +func WithCurrentRevision(rev vmapi.RevisionWithTime) VmInfoOpt { + return vmInfoModifier(func(c InitialVmInfoConfig, vm *api.VmInfo) { + vm.CurrentRevision = &rev + }) +} diff --git a/pkg/agent/core/testhelpers/revision.go b/pkg/agent/core/testhelpers/revision.go new file mode 100644 index 000000000..3c0106b39 --- /dev/null +++ b/pkg/agent/core/testhelpers/revision.go @@ -0,0 +1,33 @@ +package testhelpers + +import ( + "time" + + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" +) + +type ExpectedRevision struct { + vmv1.Revision + Now func() time.Time +} + +func NewExpectedRevision(now func() time.Time) *ExpectedRevision { + return &ExpectedRevision{ + Now: now, + Revision: vmv1.ZeroRevision, + } +} + +func (e *ExpectedRevision) WithTime() vmv1.RevisionWithTime { + return e.Revision.WithTime(e.Now()) +} + +type NilRevisionSource struct{} + +func (c *NilRevisionSource) Next(_ time.Time, _ vmv1.Flag) vmv1.Revision { + return vmv1.Revision{ + Value: 0, + Flags: 0, + } +} +func (c *NilRevisionSource) Observe(_ time.Time, _ vmv1.Revision) error { return nil } diff --git a/pkg/agent/execbridge.go b/pkg/agent/execbridge.go index 7ea6d89dc..78b9b79e9 100644 --- a/pkg/agent/execbridge.go +++ b/pkg/agent/execbridge.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/agent/executor" "github.com/neondatabase/autoscaling/pkg/api" ) @@ -86,10 +87,15 @@ func makeNeonVMInterface(r *Runner) *execNeonVMInterface { } // Request implements executor.NeonVMInterface -func (iface *execNeonVMInterface) Request(ctx context.Context, logger *zap.Logger, current, target api.Resources) error { +func (iface *execNeonVMInterface) Request( + ctx context.Context, + logger *zap.Logger, + current, target api.Resources, + targetRevision vmv1.RevisionWithTime, +) error { iface.runner.recordResourceChange(current, target, iface.runner.global.metrics.neonvmRequestedChange) - err := iface.runner.doNeonVMRequest(ctx, target) + err := iface.runner.doNeonVMRequest(ctx, target, targetRevision) if err != nil { iface.runner.status.update(iface.runner.global, func(ps podStatus) podStatus { ps.failedNeonVMRequestCounter.Inc() diff --git a/pkg/agent/executor/exec_monitor.go b/pkg/agent/executor/exec_monitor.go index 4431a5bcb..882d262a6 100644 --- a/pkg/agent/executor/exec_monitor.go +++ b/pkg/agent/executor/exec_monitor.go @@ -99,14 +99,14 @@ func (c *ExecutorCoreWithClients) DoMonitorDownscales(ctx context.Context, logge if !result.Ok { logger.Warn("vm-monitor denied downscale", logFields...) if unchanged { - state.Monitor().DownscaleRequestDenied(endTime) + state.Monitor().DownscaleRequestDenied(endTime, action.TargetRevision) } else { warnSkipBecauseChanged() } } else { logger.Info("vm-monitor approved downscale", logFields...) if unchanged { - state.Monitor().DownscaleRequestAllowed(endTime) + state.Monitor().DownscaleRequestAllowed(endTime, action.TargetRevision) } else { warnSkipBecauseChanged() } diff --git a/pkg/agent/executor/exec_neonvm.go b/pkg/agent/executor/exec_neonvm.go index 7d4eecd4a..fcd6a8811 100644 --- a/pkg/agent/executor/exec_neonvm.go +++ b/pkg/agent/executor/exec_neonvm.go @@ -6,13 +6,19 @@ import ( "go.uber.org/zap" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/agent/core" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" ) type NeonVMInterface interface { - Request(_ context.Context, _ *zap.Logger, current, target api.Resources) error + Request( + _ context.Context, + _ *zap.Logger, + current, target api.Resources, + targetRevision vmv1.RevisionWithTime, + ) error } func (c *ExecutorCoreWithClients) DoNeonVMRequests(ctx context.Context, logger *zap.Logger) { @@ -46,8 +52,10 @@ func (c *ExecutorCoreWithClients) DoNeonVMRequests(ctx context.Context, logger * continue // state has changed, retry. } - err := c.clients.NeonVM.Request(ctx, ifaceLogger, action.Current, action.Target) endTime := time.Now() + targetRevision := action.TargetRevision.WithTime(endTime) + err := c.clients.NeonVM.Request(ctx, ifaceLogger, action.Current, action.Target, targetRevision) + logFields := []zap.Field{zap.Object("action", action), zap.Duration("duration", endTime.Sub(startTime))} c.update(func(state *core.State) { diff --git a/pkg/agent/executor/exec_plugin.go b/pkg/agent/executor/exec_plugin.go index 66bcf5dc0..e9abd884b 100644 --- a/pkg/agent/executor/exec_plugin.go +++ b/pkg/agent/executor/exec_plugin.go @@ -61,7 +61,7 @@ func (c *ExecutorCoreWithClients) DoPluginRequests(ctx context.Context, logger * } else { logFields = append(logFields, zap.Any("response", resp)) logger.Info("Plugin request successful", logFields...) - if err := state.Plugin().RequestSuccessful(endTime, *resp); err != nil { + if err := state.Plugin().RequestSuccessful(endTime, action.TargetRevision, *resp); err != nil { logger.Error("Plugin response validation failed", append(logFields, zap.Error(err))...) } } diff --git a/pkg/agent/prommetrics.go b/pkg/agent/prommetrics.go index 4a004351c..95ae95435 100644 --- a/pkg/agent/prommetrics.go +++ b/pkg/agent/prommetrics.go @@ -1,9 +1,13 @@ package agent import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" "github.com/neondatabase/autoscaling/pkg/util" ) @@ -26,6 +30,23 @@ type GlobalMetrics struct { runnerStarts prometheus.Counter runnerRestarts prometheus.Counter runnerNextActions prometheus.Counter + + scalingLatency prometheus.HistogramVec + pluginLatency prometheus.HistogramVec + monitorLatency prometheus.HistogramVec + neonvmLatency prometheus.HistogramVec +} + +func (m *GlobalMetrics) PluginLatency() *prometheus.HistogramVec { + return &m.pluginLatency +} + +func (m *GlobalMetrics) MonitorLatency() *prometheus.HistogramVec { + return &m.monitorLatency +} + +func (m *GlobalMetrics) NeonVMLatency() *prometheus.HistogramVec { + return &m.neonvmLatency } type resourceChangePair struct { @@ -34,9 +55,11 @@ type resourceChangePair struct { } const ( - directionLabel = "direction" - directionValueInc = "inc" - directionValueDec = "dec" + directionLabel = "direction" + directionValueInc = "inc" + directionValueDec = "dec" + directionValueBoth = "both" + directionValueNone = "none" ) type runnerMetricState string @@ -48,6 +71,11 @@ const ( runnerMetricStatePanicked runnerMetricState = "panicked" ) +// Copied bucket values from controller runtime latency metric. We can +// adjust them in the future if needed. +var buckets = []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, + 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60} + func makeGlobalMetrics() (GlobalMetrics, *prometheus.Registry) { reg := prometheus.NewRegistry() @@ -217,6 +245,39 @@ func makeGlobalMetrics() (GlobalMetrics, *prometheus.Registry) { Help: "Number of times (*core.State).NextActions() has been called", }, )), + + scalingLatency: *util.RegisterMetric(reg, prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "autoscaling_agent_scaling_latency_seconds", + Help: "End-to-end scaling latency", + Buckets: buckets, + }, + []string{directionLabel}, + )), + pluginLatency: *util.RegisterMetric(reg, prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "autoscaling_agent_plugin_latency_seconds", + Help: "Plugin request latency", + Buckets: buckets, + }, + []string{directionLabel}, + )), + monitorLatency: *util.RegisterMetric(reg, prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "autoscaling_agent_monitor_latency_seconds", + Help: "Monitor request latency", + Buckets: buckets, + }, + []string{directionLabel}, + )), + neonvmLatency: *util.RegisterMetric(reg, prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "autoscaling_agent_neonvm_latency_seconds", + Help: "NeonVM request latency", + Buckets: buckets, + }, + []string{directionLabel}, + )), } // Some of of the metrics should have default keys set to zero. Otherwise, these won't be filled @@ -253,6 +314,25 @@ func makeGlobalMetrics() (GlobalMetrics, *prometheus.Registry) { return metrics, reg } +func flagsToDirection(flags vmv1.Flag) string { + if flags.Has(revsource.Upscale) && flags.Has(revsource.Downscale) { + return directionValueBoth + } + if flags.Has(revsource.Upscale) { + return directionValueInc + } + if flags.Has(revsource.Downscale) { + return directionValueDec + } + return directionValueNone +} + +func WrapHistogramVec(hist *prometheus.HistogramVec) revsource.ObserveCallback { + return func(dur time.Duration, flags vmv1.Flag) { + hist.WithLabelValues(flagsToDirection(flags)).Observe(dur.Seconds()) + } +} + type PerVMMetrics struct { cpu *prometheus.GaugeVec memory *prometheus.GaugeVec diff --git a/pkg/agent/runner.go b/pkg/agent/runner.go index d7e89df7b..2a282a745 100644 --- a/pkg/agent/runner.go +++ b/pkg/agent/runner.go @@ -31,7 +31,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/agent/core" + "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" "github.com/neondatabase/autoscaling/pkg/agent/executor" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/api" @@ -194,7 +196,14 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util pluginRequestJitter := util.NewTimeRange(time.Millisecond, 0, 100).Random() coreExecLogger := execLogger.Named("core") - executorCore := executor.NewExecutorCore(coreExecLogger, getVmInfo(), executor.Config{ + + vmInfo := getVmInfo() + var initialRevision int64 + if vmInfo.CurrentRevision != nil { + initialRevision = vmInfo.CurrentRevision.Value + } + revisionSource := revsource.NewRevisionSource(initialRevision, WrapHistogramVec(&r.global.metrics.scalingLatency)) + executorCore := executor.NewExecutorCore(coreExecLogger, vmInfo, executor.Config{ OnNextActions: r.global.metrics.runnerNextActions.Inc, Core: core.Config{ ComputeUnit: r.global.config.Scaling.ComputeUnit, @@ -210,6 +219,12 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util Info: coreExecLogger.Info, Warn: coreExecLogger.Warn, }, + RevisionSource: revisionSource, + ObservabilityCallbacks: core.ObservabilityCallbacks{ + PluginLatency: WrapHistogramVec(&r.global.metrics.pluginLatency), + MonitorLatency: WrapHistogramVec(&r.global.metrics.monitorLatency), + NeonVMLatency: WrapHistogramVec(&r.global.metrics.neonvmLatency), + }, }, }) @@ -625,7 +640,11 @@ func doMetricsRequest( return nil } -func (r *Runner) doNeonVMRequest(ctx context.Context, target api.Resources) error { +func (r *Runner) doNeonVMRequest( + ctx context.Context, + target api.Resources, + targetRevision vmv1.RevisionWithTime, +) error { patches := []patch.Operation{{ Op: patch.OpReplace, Path: "/spec/guest/cpus/use", @@ -634,6 +653,10 @@ func (r *Runner) doNeonVMRequest(ctx context.Context, target api.Resources) erro Op: patch.OpReplace, Path: "/spec/guest/memorySlots/use", Value: uint32(target.Mem / r.memSlotSize), + }, { + Op: patch.OpReplace, + Path: "/spec/targetRevision", + Value: targetRevision, }} patchPayload, err := json.Marshal(patches) diff --git a/pkg/api/vminfo.go b/pkg/api/vminfo.go index 438be5587..6b19233e6 100644 --- a/pkg/api/vminfo.go +++ b/pkg/api/vminfo.go @@ -53,11 +53,12 @@ func HasAlwaysMigrateLabel(obj metav1.ObjectMetaAccessor) bool { // care about. It takes various labels and annotations into account, so certain fields might be // different from what's strictly in the VirtualMachine object. type VmInfo struct { - Name string `json:"name"` - Namespace string `json:"namespace"` - Cpu VmCpuInfo `json:"cpu"` - Mem VmMemInfo `json:"mem"` - Config VmConfig `json:"config"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Cpu VmCpuInfo `json:"cpu"` + Mem VmMemInfo `json:"mem"` + Config VmConfig `json:"config"` + CurrentRevision *vmapi.RevisionWithTime `json:"currentRevision,omitempty"` } type VmCpuInfo struct { @@ -150,7 +151,13 @@ func (vm VmInfo) NamespacedName() util.NamespacedName { func ExtractVmInfo(logger *zap.Logger, vm *vmapi.VirtualMachine) (*VmInfo, error) { logger = logger.With(util.VMNameFields(vm)) - return extractVmInfoGeneric(logger, vm.Name, vm, vm.Spec.Resources()) + info, err := extractVmInfoGeneric(logger, vm.Name, vm, vm.Spec.Resources()) + if err != nil { + return nil, fmt.Errorf("error extracting VM info: %w", err) + } + + info.CurrentRevision = vm.Status.CurrentRevision + return info, nil } func ExtractVmInfoFromPod(logger *zap.Logger, pod *corev1.Pod) (*VmInfo, error) { @@ -191,6 +198,7 @@ func extractVmInfoGeneric( ScalingEnabled: scalingEnabled, ScalingConfig: nil, // set below, maybe }, + CurrentRevision: nil, // set later, maybe } if boundsJSON, ok := obj.GetObjectMeta().GetAnnotations()[AnnotationAutoscalingBounds]; ok { diff --git a/tests/e2e/autoscaling/00-assert.yaml b/tests/e2e/autoscaling.default/00-assert.yaml similarity index 100% rename from tests/e2e/autoscaling/00-assert.yaml rename to tests/e2e/autoscaling.default/00-assert.yaml diff --git a/tests/e2e/autoscaling/00-create-vm.yaml b/tests/e2e/autoscaling.default/00-create-vm.yaml similarity index 100% rename from tests/e2e/autoscaling/00-create-vm.yaml rename to tests/e2e/autoscaling.default/00-create-vm.yaml diff --git a/tests/e2e/autoscaling/01-assert.yaml b/tests/e2e/autoscaling.default/01-assert.yaml similarity index 100% rename from tests/e2e/autoscaling/01-assert.yaml rename to tests/e2e/autoscaling.default/01-assert.yaml diff --git a/tests/e2e/autoscaling/01-upscale.yaml b/tests/e2e/autoscaling.default/01-upscale.yaml similarity index 100% rename from tests/e2e/autoscaling/01-upscale.yaml rename to tests/e2e/autoscaling.default/01-upscale.yaml diff --git a/tests/e2e/autoscaling/02-assert.yaml b/tests/e2e/autoscaling.default/02-assert.yaml similarity index 100% rename from tests/e2e/autoscaling/02-assert.yaml rename to tests/e2e/autoscaling.default/02-assert.yaml diff --git a/tests/e2e/autoscaling/02-downscale.yaml b/tests/e2e/autoscaling.default/02-downscale.yaml similarity index 100% rename from tests/e2e/autoscaling/02-downscale.yaml rename to tests/e2e/autoscaling.default/02-downscale.yaml diff --git a/tests/e2e/autoscaling.revisions/00-assert.yaml b/tests/e2e/autoscaling.revisions/00-assert.yaml new file mode 100644 index 000000000..b6bdbee5c --- /dev/null +++ b/tests/e2e/autoscaling.revisions/00-assert.yaml @@ -0,0 +1,21 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 250m + memorySize: 1Gi + memoryProvider: DIMMSlots + currentRevision: + revision: + value: 123 + flags: 456 diff --git a/tests/e2e/autoscaling.revisions/00-create-vm.yaml b/tests/e2e/autoscaling.revisions/00-create-vm.yaml new file mode 100644 index 000000000..9b480f41c --- /dev/null +++ b/tests/e2e/autoscaling.revisions/00-create-vm.yaml @@ -0,0 +1,105 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Service +metadata: + name: example +spec: + ports: + - name: postgres + port: 5432 + protocol: TCP + targetPort: postgres + type: NodePort + selector: + vm.neon.tech/name: example +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example + labels: + autoscaling.neon.tech/enabled: "true" + annotations: + autoscaling.neon.tech/bounds: '{ "min": { "cpu": "250m", "mem": "1Gi" }, "max": {"cpu": 1, "mem": "4Gi" } }' +spec: + schedulerName: autoscale-scheduler + targetRevision: + revision: # Just arbitrary values + value: 123 + flags: 456 + updatedAt: 2006-01-02T15:04:05Z + guest: + cpus: + min: 0.25 + max: 1.25 # set value greater than bounds so our tests check we don't exceed the bounds. + use: 0.25 + memorySlotSize: 1Gi + memorySlots: + min: 1 + max: 5 + use: 1 + rootDisk: + image: vm-postgres:15-bullseye + size: 8Gi + args: + - -c + - 'config_file=/etc/postgresql/postgresql.conf' + env: + # for testing only - allows login without password + - name: POSTGRES_HOST_AUTH_METHOD + value: trust + ports: + - name: postgres + port: 5432 + - name: host-metrics + port: 9100 + - name: monitor + port: 10301 + extraNetwork: + enable: true + disks: + - name: pgdata + mountPath: /var/lib/postgresql + emptyDisk: + size: 16Gi + - name: postgres-config + mountPath: /etc/postgresql + configMap: + name: example-config + items: + - key: postgresql.conf + path: postgresql.conf + - name: cache + mountPath: /neonvm/cache + tmpfs: + size: 1Gi + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: example-config +data: + postgresql.conf: | + listen_addresses = '*' + shared_preload_libraries = 'pg_stat_statements' + + max_connections = 64 + shared_buffers = 256MB + effective_cache_size = 1536MB + maintenance_work_mem = 128MB + checkpoint_completion_target = 0.9 + wal_buffers = 16MB + default_statistics_target = 100 + random_page_cost = 1.1 + effective_io_concurrency = 200 + work_mem = 4MB + min_wal_size = 1GB + max_wal_size = 4GB + max_worker_processes = 4 + max_parallel_workers_per_gather = 2 + max_parallel_workers = 4 + max_parallel_maintenance_workers = 2 diff --git a/tests/e2e/autoscaling.revisions/01-assert.yaml b/tests/e2e/autoscaling.revisions/01-assert.yaml new file mode 100644 index 000000000..a65939fea --- /dev/null +++ b/tests/e2e/autoscaling.revisions/01-assert.yaml @@ -0,0 +1,13 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + currentRevision: + revision: + value: 1 + flags: 123 diff --git a/tests/e2e/autoscaling.revisions/01-change-target-revision.yaml b/tests/e2e/autoscaling.revisions/01-change-target-revision.yaml new file mode 100644 index 000000000..716d7b309 --- /dev/null +++ b/tests/e2e/autoscaling.revisions/01-change-target-revision.yaml @@ -0,0 +1,17 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- + +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +spec: + targetRevision: + # Note that revision goes backward, compared with the previous step. + # This is intentional, in case it races with autoscaler-agent restarts. + revision: + value: 1 + flags: 123 + updatedAt: 2020-01-02T15:04:05Z diff --git a/tests/e2e/autoscaling.revisions/02-assert.yaml b/tests/e2e/autoscaling.revisions/02-assert.yaml new file mode 100644 index 000000000..b8b35bb47 --- /dev/null +++ b/tests/e2e/autoscaling.revisions/02-assert.yaml @@ -0,0 +1,32 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +spec: + targetRevision: + revision: + value: 124 # we had 123 as the initial revision + flags: 1 # 1 for Upscale +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 1 + memorySize: 4Gi + currentRevision: # Already propagated from above + revision: + value: 124 + flags: 1 +--- +apiVersion: v1 +kind: pod +metadata: + name: workload +status: + phase: Running diff --git a/tests/e2e/autoscaling.revisions/02-upscale.yaml b/tests/e2e/autoscaling.revisions/02-upscale.yaml new file mode 100644 index 000000000..d95636340 --- /dev/null +++ b/tests/e2e/autoscaling.revisions/02-upscale.yaml @@ -0,0 +1,49 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Pod +metadata: + name: workload +spec: + terminationGracePeriodSeconds: 1 + initContainers: + - name: wait-for-pg + image: postgres:15-bullseye + command: + - sh + - "-c" + - | + set -e + until pg_isready --username=postgres --dbname=postgres --host=example --port=5432; do + sleep 1 + done + containers: + - name: pgbench + image: postgres:15-bullseye + volumeMounts: + - name: my-volume + mountPath: /etc/misc + command: + - pgbench + args: + - postgres://postgres@example:5432/postgres + - --client=20 + - --progress=1 + - --progress-timestamp + - --time=600 + - --file=/etc/misc/query.sql + volumes: + - name: my-volume + configMap: + name: query + restartPolicy: Never +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: query +data: + query.sql: | + select length(factorial(length(factorial(1223)::text)/2)::text);