Skip to content

Commit

Permalink
agent/core: Use cached LFC for memory scaling signal
Browse files Browse the repository at this point in the history
In short: In addition to scaling when there's a lot of memory used by
postgres, we should also scale up to make sure that enough of the LFC is
able to fit into the page cache alongside it.

To answer "how much is enough of the LFC", we take the minimum of the
estimated working set size and the cached memory (from the 'Cached'
field of /proc/meminfo, via vector metrics).

Part of #1030. Must be deployed before the vm-monitor changes in order
to make sure we don't have worse performance for workloads that are both
memory-heavy and rely on LFC being in the VM's page cache.
  • Loading branch information
sharnoff committed Sep 19, 2024
1 parent 1edc30c commit d46f47a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 62 deletions.
1 change: 1 addition & 0 deletions deploy/agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ data:
"defaultConfig": {
"loadAverageFractionTarget": 0.9,
"memoryUsageFractionTarget": 0.75,
"memoryTotalFractionTarget": 0.9,
"enableLFCMetrics": false,
"lfcToMemoryRatio": 0.75,
"lfcWindowSizeMinutes": 5,
Expand Down
64 changes: 44 additions & 20 deletions pkg/agent/core/goalcu.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package core
import (
"math"

"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util"
)

type scalingGoal struct {
Expand All @@ -29,26 +29,31 @@ func calculateGoalCU(
warn("Making scaling decision without all required metrics available")
}

var goalCU uint32
var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU uint32
var logFields []zap.Field

if systemMetrics != nil {
cpuGoalCU := calculateCPUGoalCU(cfg, computeUnit, *systemMetrics)
goalCU = max(goalCU, cpuGoalCU)

memGoalCU := calculateMemGoalCU(cfg, computeUnit, *systemMetrics)

goalCU = max(goalCU, memGoalCU)
}
var wss *api.Bytes // estimated working set size

if lfcMetrics != nil {
lfcGoalCU, lfcLogFunc := calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics)
goalCU = max(goalCU, lfcGoalCU)
var lfcLogFunc func(zapcore.ObjectEncoder) error
lfcGoalCU, wss, lfcLogFunc = calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics)
if lfcLogFunc != nil {
logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFunc)))
}
}

if systemMetrics != nil {
cpuGoalCU = calculateCPUGoalCU(cfg, computeUnit, *systemMetrics)

memGoalCU = calculateMemGoalCU(cfg, computeUnit, *systemMetrics)
}

if systemMetrics != nil && wss != nil {
memTotalGoalCU = calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss)
}

goalCU := max(cpuGoalCU, memGoalCU, memTotalGoalCU, lfcGoalCU)

return scalingGoal{hasAllMetrics: hasAllMetrics, goalCU: goalCU}, logFields
}

Expand All @@ -74,17 +79,34 @@ func calculateMemGoalCU(
computeUnit api.Resources,
systemMetrics SystemMetrics,
) uint32 {
// goal memory size, just looking at allocated memory (not including page cache...)
memGoalBytes := api.Bytes(math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget))
memGoalCU := uint32(memGoalBytes / computeUnit.Mem)

// note: this is equal to ceil(memGoalBytes / computeUnit.Mem), because ceil(X/M) == floor((X+M-1)/M)
memGoalCU := uint32((memGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem)
return memGoalCU
}

// goal memory size, looking at allocated memory and min(page cache usage, LFC working set size)
func calculateMemTotalGoalCU(
cfg api.ScalingConfig,
computeUnit api.Resources,
systemMetrics SystemMetrics,
wss api.Bytes,
) uint32 {
lfcCached := min(float64(wss), systemMetrics.MemoryCachedBytes)
totalGoalBytes := api.Bytes((lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget)

memTotalGoalCU := uint32((totalGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem)
return memTotalGoalCU
}

func calculateLFCGoalCU(
warn func(string),
cfg api.ScalingConfig,
computeUnit api.Resources,
lfcMetrics LFCMetrics,
) (uint32, func(zapcore.ObjectEncoder) error) {
) (uint32, *api.Bytes, func(zapcore.ObjectEncoder) error) {
wssValues := lfcMetrics.ApproximateworkingSetSizeBuckets
// At this point, we can assume that the values are equally spaced at 1 minute apart,
// starting at 1 minute.
Expand All @@ -93,7 +115,7 @@ func calculateLFCGoalCU(
// Handle invalid metrics:
if len(wssValues) < offsetIndex+windowSize {
warn("not enough working set size values to make scaling determination")
return 0, nil
return 0, nil, nil
} else {
estimateWss := EstimateTrueWorkingSetSize(wssValues, WssEstimatorConfig{
MaxAllowedIncreaseFactor: 3.0, // hard-code this for now.
Expand All @@ -107,10 +129,12 @@ func calculateLFCGoalCU(
predictedHighestNextMinute := ProjectNextHighest(wssValues[:projectSliceEnd], projectLen)

// predictedHighestNextMinute is still in units of 8KiB pages. Let's convert that
// into GiB, then convert that into CU, and then invert the discount from only some
// of the memory going towards LFC to get the actual CU required to fit the
// predicted working set size.
requiredCU := predictedHighestNextMinute * 8192 / computeUnit.Mem.AsFloat64() / *cfg.LFCToMemoryRatio
// into GiB...
estimateWssMem := predictedHighestNextMinute * 8192
// ... and then invert the discount form only some of the memory going towards LFC...
requiredMem := estimateWssMem / *cfg.LFCToMemoryRatio
// ... and then convert that into the actual CU required to fit the working set:
requiredCU := requiredMem / computeUnit.Mem.AsFloat64()
lfcGoalCU := uint32(math.Ceil(requiredCU))

lfcLogFields := func(obj zapcore.ObjectEncoder) error {
Expand All @@ -120,6 +144,6 @@ func calculateLFCGoalCU(
return nil
}

return lfcGoalCU, lfcLogFields
return lfcGoalCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields
}
}
10 changes: 7 additions & 3 deletions pkg/agent/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
)

type SystemMetrics struct {
LoadAverage1Min float64
MemoryUsageBytes float64
LoadAverage1Min float64

MemoryUsageBytes float64
MemoryCachedBytes float64
}

func (m SystemMetrics) ToAPI() api.Metrics {
Expand Down Expand Up @@ -94,11 +96,13 @@ func (m *SystemMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) e
load1 := getFloat("host_load1")
memTotal := getFloat("host_memory_total_bytes")
memAvailable := getFloat("host_memory_available_bytes")
memCached := getFloat("host_memory_cached_bytes")

tmp := SystemMetrics{
LoadAverage1Min: load1,
// Add an extra 100 MiB to account for kernel memory usage
MemoryUsageBytes: memTotal - memAvailable + 100*(1<<20),
MemoryUsageBytes: memTotal - memAvailable + 100*(1<<20),
MemoryCachedBytes: memCached,
}

if err := ec.Resolve(); err != nil {
Expand Down
92 changes: 56 additions & 36 deletions pkg/agent/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
{
name: "BasicScaleup",
systemMetrics: &core.SystemMetrics{
LoadAverage1Min: 0.30,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.30,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
},
lfcMetrics: nil,
enableLFCMetrics: false,
Expand All @@ -56,8 +57,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
{
name: "MismatchedApprovedNoScaledown",
systemMetrics: &core.SystemMetrics{
LoadAverage1Min: 0.0, // ordinarily would like to scale down
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0, // ordinarily would like to scale down
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
},
lfcMetrics: nil,
enableLFCMetrics: false,
Expand All @@ -74,8 +76,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
// ref https://github.com/neondatabase/autoscaling/issues/512
name: "MismatchedApprovedNoScaledownButVMAtMaximum",
systemMetrics: &core.SystemMetrics{
LoadAverage1Min: 0.0, // ordinarily would like to scale down
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0, // ordinarily would like to scale down
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
},
lfcMetrics: nil,
enableLFCMetrics: false,
Expand All @@ -92,8 +95,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
{
name: "BasicLFCScaleup",
systemMetrics: &core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
},
lfcMetrics: &core.LFCMetrics{
CacheHitsTotal: 0.0, // unused
Expand Down Expand Up @@ -126,8 +130,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
// still scale up if load average dictates it.
name: "CanScaleUpWithoutExpectedLFCMetrics",
systemMetrics: &core.SystemMetrics{
LoadAverage1Min: 0.30,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.30,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
},
lfcMetrics: nil,
enableLFCMetrics: true,
Expand All @@ -146,8 +151,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
// bounds we can still action that.
name: "CanScaleToBoundsWithoutExpectedMetrics",
systemMetrics: &core.SystemMetrics{
LoadAverage1Min: 0.30,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.30,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
},
lfcMetrics: nil,
enableLFCMetrics: true,
Expand Down Expand Up @@ -226,6 +232,7 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
DefaultScalingConfig: api.ScalingConfig{
LoadAverageFractionTarget: lo.ToPtr(0.5),
MemoryUsageFractionTarget: lo.ToPtr(0.5),
MemoryTotalFractionTarget: lo.ToPtr(0.9),
EnableLFCMetrics: lo.ToPtr(enableLFCMetrics),
LFCToMemoryRatio: lo.ToPtr(0.75),
LFCWindowSizeMinutes: lo.ToPtr(5),
Expand Down Expand Up @@ -313,6 +320,7 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{
DefaultScalingConfig: api.ScalingConfig{
LoadAverageFractionTarget: lo.ToPtr(0.5),
MemoryUsageFractionTarget: lo.ToPtr(0.5),
MemoryTotalFractionTarget: lo.ToPtr(0.9),
EnableLFCMetrics: lo.ToPtr(false),
LFCToMemoryRatio: lo.ToPtr(0.75),
LFCWindowSizeMinutes: lo.ToPtr(5),
Expand Down Expand Up @@ -441,8 +449,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) {
// Set metrics
clockTick().AssertEquals(duration("0.2s"))
lastMetrics := core.SystemMetrics{
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, lastMetrics)
// double-check that we agree about the desired resources
Expand Down Expand Up @@ -539,8 +548,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) {

// Set metrics back so that desired resources should now be zero
lastMetrics = core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, lastMetrics)
// double-check that we agree about the new desired resources
Expand Down Expand Up @@ -641,8 +651,9 @@ func TestPeriodicPluginRequest(t *testing.T) {
state.Monitor().Active(true)

metrics := core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
resources := DefaultComputeUnit

Expand Down Expand Up @@ -730,8 +741,9 @@ func TestPartialUpscaleThenFull(t *testing.T) {
// Set metrics
clockTick()
metrics := core.SystemMetrics{
LoadAverage1Min: 1.0,
MemoryUsageBytes: 12345678,
LoadAverage1Min: 1.0,
MemoryUsageBytes: 12345678,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, metrics)

Expand Down Expand Up @@ -873,8 +885,9 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) {
// Set metrics
clockTick()
metrics := core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, metrics)
// double-check that we agree about the desired resources
Expand Down Expand Up @@ -1148,8 +1161,9 @@ func TestRequestedUpscale(t *testing.T) {
// Set metrics
clockTick()
lastMetrics := core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, lastMetrics)

Expand Down Expand Up @@ -1290,12 +1304,14 @@ func TestDownscalePivotBack(t *testing.T) {
}

initialMetrics := core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.0,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
newMetrics := core.SystemMetrics{
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}

steps := []struct {
Expand Down Expand Up @@ -1531,8 +1547,9 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) {

// Set metrics so the desired resources are still 2 CU
metrics := core.SystemMetrics{
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, metrics)
// Check that we agree about desired resources
Expand Down Expand Up @@ -1645,8 +1662,9 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) {

// Set metrics so the desired resources are still 2 CU
metrics := core.SystemMetrics{
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, metrics)
// Check that we agree about desired resources
Expand Down Expand Up @@ -1744,8 +1762,9 @@ func TestFailedRequestRetry(t *testing.T) {
// Set metrics so that we should be trying to upscale
clockTick()
metrics := core.SystemMetrics{
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
LoadAverage1Min: 0.3,
MemoryUsageBytes: 0.0,
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, metrics)

Expand Down Expand Up @@ -1900,8 +1919,9 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) {
clockTick()
// the actual metrics we got in the actual logs
metrics := core.SystemMetrics{
LoadAverage1Min: 0.0,
MemoryUsageBytes: 150589570, // 143.6 MiB
LoadAverage1Min: 0.0,
MemoryUsageBytes: 150589570, // 143.6 MiB
MemoryCachedBytes: 0.0,
}
a.Do(state.UpdateSystemMetrics, metrics)

Expand Down
Loading

0 comments on commit d46f47a

Please sign in to comment.