Skip to content

Commit

Permalink
agent/core: Implement LFC-aware scaling (#1003)
Browse files Browse the repository at this point in the history
Part of #872.
This builds on the metrics exposed by neondatabase/neon#8298.

For now, we only look at the working set size metrics over various time
windows.

The algorithm is somewhat straightforward to implement (see wss.go), but
unfortunately seems to be difficult to understand *why* it's expected to
work.

See also: https://www.notion.so/neondatabase/cca38138fadd45eaa753d81b859490c6
  • Loading branch information
sharnoff authored Jul 23, 2024
1 parent 4395a93 commit 3b8e2b2
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 41 deletions.
7 changes: 5 additions & 2 deletions deploy/agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ data:
"defaultConfig": {
"loadAverageFractionTarget": 0.9,
"memoryUsageFractionTarget": 0.75,
"enableLFCMetrics": false
"enableLFCMetrics": false,
"lfcToMemoryRatio": 0.75,
"lfcWindowSizeMinutes": 5,
"lfcMinWaitBeforeDownscaleMinutes": 5
}
},
"billing": {
Expand Down Expand Up @@ -45,7 +48,7 @@ data:
"secondsBetweenRequests": 5
},
"lfc": {
"port": 9399,
"port": 9499,
"requestTimeoutSeconds": 5,
"secondsBetweenRequests": 15
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/core/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *State) Dump() StateDump {
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
LFCMetrics: shallowCopy[LFCMetrics](s.internal.LFCMetrics),
TargetRevision: s.internal.TargetRevision,
LastDesiredResources: s.internal.LastDesiredResources,
},
Expand Down
82 changes: 80 additions & 2 deletions pkg/agent/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package core
// Definition of the Metrics type, plus reading it from vector.dev's prometheus format host metrics

import (
"cmp"
"fmt"
"io"
"slices"
"strconv"
"time"

promtypes "github.com/prometheus/client_model/go"
promfmt "github.com/prometheus/common/expfmt"
Expand All @@ -31,7 +35,9 @@ type LFCMetrics struct {
CacheMissesTotal float64
CacheWritesTotal float64

ApproximateWorkingSetSizeTotal float64 // approximate_working_set_size
// lfc_approximate_working_set_size_windows, currently requires that values are exactly every
// minute
ApproximateworkingSetSizeBuckets []float64
}

// FromPrometheus represents metric types that can be parsed from prometheus output.
Expand Down Expand Up @@ -118,12 +124,15 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro
}
}

wssBuckets, err := extractWorkingSetSizeWindows(mfs)
ec.Add(err)

tmp := LFCMetrics{
CacheHitsTotal: getFloat("lfc_hits"),
CacheMissesTotal: getFloat("lfc_misses"),
CacheWritesTotal: getFloat("lfc_writes"),

ApproximateWorkingSetSizeTotal: getFloat("lfc_approximate_working_set_size"),
ApproximateworkingSetSizeBuckets: wssBuckets,
}

if err := ec.Resolve(); err != nil {
Expand All @@ -133,3 +142,72 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro
*m = tmp
return nil
}

func extractWorkingSetSizeWindows(mfs map[string]*promtypes.MetricFamily) ([]float64, error) {
metricName := "lfc_approximate_working_set_size_windows"
mf := mfs[metricName]
if mf == nil {
return nil, missingMetric(metricName)
}

if mf.GetType() != promtypes.MetricType_GAUGE {
return nil, fmt.Errorf("wrong metric type: expected %s, but got %s", promtypes.MetricType_GAUGE, mf.GetType())
} else if len(mf.Metric) < 1 {
return nil, fmt.Errorf("expected >= metric, found %d", len(mf.Metric))
}

type pair struct {
duration time.Duration
value float64
}

var pairs []pair
for _, m := range mf.Metric {
// Find the duration label
durationLabel := "duration_seconds"
durationIndex := slices.IndexFunc(m.Label, func(l *promtypes.LabelPair) bool {
return l.GetName() == durationLabel
})
if durationIndex == -1 {
return nil, fmt.Errorf("metric missing label %q", durationLabel)
}

durationSeconds, err := strconv.Atoi(m.Label[durationIndex].GetValue())
if err != nil {
return nil, fmt.Errorf("couldn't parse metric's %q label as int: %w", durationLabel, err)
}

pairs = append(pairs, pair{
duration: time.Second * time.Duration(durationSeconds),
value: m.GetGauge().GetValue(),
})
}

slices.SortFunc(pairs, func(x, y pair) int {
return cmp.Compare(x.duration, y.duration)
})

// Check that the values make are as expected: they should all be 1 minute apart, starting
// at 1 minute.
// NOTE: this assumption is relied on elsewhere for scaling on ApproximateworkingSetSizeBuckets.
// Please search for usages before changing this behavior.
if pairs[0].duration != time.Minute {
return nil, fmt.Errorf("expected smallest duration to be %v, got %v", time.Minute, pairs[0].duration)
}
for i := range pairs {
expected := time.Minute * time.Duration(i+1)
if pairs[i].duration != expected {
return nil, fmt.Errorf(
"expected duration values to be exactly 1m apart, got unexpected value %v instead of %v",
pairs[i].duration,
expected,
)
}
}

var values []float64
for _, p := range pairs {
values = append(values, p.value)
}
return values, nil
}
85 changes: 75 additions & 10 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/agent/core/revsource"
Expand Down Expand Up @@ -134,6 +135,8 @@ type state struct {

Metrics *SystemMetrics

LFCMetrics *LFCMetrics

// TargetRevision is the revision agent works towards.
TargetRevision vmv1.Revision

Expand Down Expand Up @@ -263,6 +266,7 @@ func NewState(vm api.VmInfo, config Config) *State {
CurrentRevision: vmv1.ZeroRevision,
},
Metrics: nil,
LFCMetrics: nil,
LastDesiredResources: nil,
TargetRevision: vmv1.ZeroRevision,
},
Expand Down Expand Up @@ -726,6 +730,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
// 2. Cap the goal CU by min/max, etc
// 3. that's it!

// Record whether we have all the metrics we'll need.
// If not, we'll later prevent downscaling to avoid flushing the VM's cache on autoscaler-agent
// restart if we have SystemMetrics but not LFCMetrics.
hasAllMetrics := s.Metrics != nil && (!*s.scalingConfig().EnableLFCMetrics || s.LFCMetrics != nil)
if !hasAllMetrics {
s.warn("Making scaling decision without all required metrics available")
}

var goalCU uint32
if s.Metrics != nil {
// For CPU:
Expand All @@ -745,6 +757,48 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
memGoalCU := uint32(memGoalBytes / s.Config.ComputeUnit.Mem)

goalCU = util.Max(cpuGoalCU, memGoalCU)

}

// For LFC metrics, if enabled:
var lfcLogFields func(zapcore.ObjectEncoder) error
if s.LFCMetrics != nil {
cfg := s.scalingConfig()
wssValues := s.LFCMetrics.ApproximateworkingSetSizeBuckets
// At this point, we can assume that the values are equally spaced at 1 minute apart,
// starting at 1 minute.
offsetIndex := *cfg.LFCMinWaitBeforeDownscaleMinutes - 1 // -1 because values start at 1m
windowSize := *cfg.LFCWindowSizeMinutes
// Handle invalid metrics:
if len(wssValues) < offsetIndex+windowSize {
s.warn("not enough working set size values to make scaling determination")
} else {
estimateWss := EstimateTrueWorkingSetSize(wssValues, WssEstimatorConfig{
MaxAllowedIncreaseFactor: 3.0, // hard-code this for now.
InitialOffset: offsetIndex,
WindowSize: windowSize,
})
projectSliceEnd := offsetIndex // start at offsetIndex to avoid panics if not monotonically non-decreasing
for ; projectSliceEnd < len(wssValues) && wssValues[projectSliceEnd] <= estimateWss; projectSliceEnd++ {
}
projectLen := 0.5 // hard-code this for now.
predictedHighestNextMinute := ProjectNextHighest(wssValues[:projectSliceEnd], projectLen)

// predictedHighestNextMinute is still in units of 8KiB pages. Let's convert that
// into GiB, then convert that into CU, and then invert the discount from only some
// of the memory going towards LFC to get the actual CU required to fit the
// predicted working set size.
requiredCU := predictedHighestNextMinute * 8192 / s.Config.ComputeUnit.Mem.AsFloat64() / *cfg.LFCToMemoryRatio
lfcGoalCU := uint32(math.Ceil(requiredCU))
goalCU = util.Max(goalCU, lfcGoalCU)

lfcLogFields = func(obj zapcore.ObjectEncoder) error {
obj.AddFloat64("estimateWssPages", estimateWss)
obj.AddFloat64("predictedNextWssPages", predictedHighestNextMinute)
obj.AddFloat64("requiredCU", requiredCU)
return nil
}
}
}

// Copy the initial value of the goal CU so that we can accurately track whether either
Expand Down Expand Up @@ -782,14 +836,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
}

// resources for the desired "goal" compute units
var goalResources api.Resources
goalResources := s.Config.ComputeUnit.Mul(uint16(goalCU))

// If there's no constraints and s.metrics is nil, then we'll end up with goalCU = 0.
// But if we have no metrics, we'd prefer to keep things as-is, rather than scaling down.
if s.Metrics == nil && goalCU == 0 {
goalResources = s.VM.Using()
} else {
goalResources = s.Config.ComputeUnit.Mul(uint16(goalCU))
// If we don't have all the metrics we need to make a proper decision, make sure that we aren't
// going to scale down below the current resources.
// Otherwise, we can make an under-informed decision that has undesirable impacts (e.g., scaling
// down because we don't have LFC metrics and flushing the cache because of it).
if !hasAllMetrics {
goalResources = goalResources.Max(s.VM.Using())
}

// bound goalResources by the minimum and maximum resource amounts for the VM
Expand Down Expand Up @@ -856,10 +910,15 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
// redundant, and we should remove one of the two.
s.LastDesiredResources = &result

s.info("Calculated desired resources",
logFields := []zap.Field{
zap.Object("current", s.VM.Using()),
zap.Object("target", result),
zap.Object("targetRevision", &s.TargetRevision))
zap.Object("targetRevision", &s.TargetRevision),
}
if lfcLogFields != nil {
logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFields)))
}
s.info("Calculated desired resources", logFields...)

return result, calculateWaitTime
}
Expand Down Expand Up @@ -1040,14 +1099,20 @@ func (s *State) UpdatedVM(vm api.VmInfo) {
if vm.CurrentRevision != nil {
s.internal.updateNeonVMCurrentRevision(*vm.CurrentRevision)
}

// Make sure that if LFC metrics are disabled & later enabled, we don't make decisions based on
// stale data.
if !*s.internal.scalingConfig().EnableLFCMetrics {
s.internal.LFCMetrics = nil
}
}

func (s *State) UpdateSystemMetrics(metrics SystemMetrics) {
s.internal.Metrics = &metrics
}

func (s *State) UpdateLFCMetrics(metrics LFCMetrics) {
// stub implementation, intentionally does nothing yet.
s.internal.LFCMetrics = &metrics
}

// PluginHandle provides write access to the scheduler plugin pieces of an UpdateState
Expand Down
Loading

0 comments on commit 3b8e2b2

Please sign in to comment.