diff --git a/deploy/agent/config_map.yaml b/deploy/agent/config_map.yaml index 87ee2db65..b22c30d6b 100644 --- a/deploy/agent/config_map.yaml +++ b/deploy/agent/config_map.yaml @@ -12,6 +12,7 @@ data: "defaultConfig": { "loadAverageFractionTarget": 0.9, "memoryUsageFractionTarget": 0.75, + "memoryTotalFractionTarget": 0.9, "enableLFCMetrics": false, "lfcToMemoryRatio": 0.75, "lfcWindowSizeMinutes": 5, diff --git a/pkg/agent/core/goalcu.go b/pkg/agent/core/goalcu.go index 08c1e093f..fd5785d79 100644 --- a/pkg/agent/core/goalcu.go +++ b/pkg/agent/core/goalcu.go @@ -5,11 +5,11 @@ package core import ( "math" + "github.com/samber/lo" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/neondatabase/autoscaling/pkg/api" - "github.com/neondatabase/autoscaling/pkg/util" ) type scalingGoal struct { @@ -29,26 +29,31 @@ func calculateGoalCU( warn("Making scaling decision without all required metrics available") } - var goalCU uint32 + var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU uint32 var logFields []zap.Field - if systemMetrics != nil { - cpuGoalCU := calculateCPUGoalCU(cfg, computeUnit, *systemMetrics) - goalCU = max(goalCU, cpuGoalCU) - - memGoalCU := calculateMemGoalCU(cfg, computeUnit, *systemMetrics) - - goalCU = max(goalCU, memGoalCU) - } + var wss *api.Bytes // estimated working set size if lfcMetrics != nil { - lfcGoalCU, lfcLogFunc := calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics) - goalCU = max(goalCU, lfcGoalCU) + var lfcLogFunc func(zapcore.ObjectEncoder) error + lfcGoalCU, wss, lfcLogFunc = calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics) if lfcLogFunc != nil { logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFunc))) } } + if systemMetrics != nil { + cpuGoalCU = calculateCPUGoalCU(cfg, computeUnit, *systemMetrics) + + memGoalCU = calculateMemGoalCU(cfg, computeUnit, *systemMetrics) + } + + if systemMetrics != nil && wss != nil { + memTotalGoalCU = calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss) + } + + goalCU := max(cpuGoalCU, memGoalCU, memTotalGoalCU, lfcGoalCU) + return scalingGoal{hasAllMetrics: hasAllMetrics, goalCU: goalCU}, logFields } @@ -74,17 +79,34 @@ func calculateMemGoalCU( computeUnit api.Resources, systemMetrics SystemMetrics, ) uint32 { + // goal memory size, just looking at allocated memory (not including page cache...) memGoalBytes := api.Bytes(math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget)) - memGoalCU := uint32(memGoalBytes / computeUnit.Mem) + + // note: this is equal to ceil(memGoalBytes / computeUnit.Mem), because ceil(X/M) == floor((X+M-1)/M) + memGoalCU := uint32((memGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem) return memGoalCU } +// goal memory size, looking at allocated memory and min(page cache usage, LFC working set size) +func calculateMemTotalGoalCU( + cfg api.ScalingConfig, + computeUnit api.Resources, + systemMetrics SystemMetrics, + wss api.Bytes, +) uint32 { + lfcCached := min(float64(wss), systemMetrics.MemoryCachedBytes) + totalGoalBytes := api.Bytes((lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget) + + memTotalGoalCU := uint32((totalGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem) + return memTotalGoalCU +} + func calculateLFCGoalCU( warn func(string), cfg api.ScalingConfig, computeUnit api.Resources, lfcMetrics LFCMetrics, -) (uint32, func(zapcore.ObjectEncoder) error) { +) (uint32, *api.Bytes, func(zapcore.ObjectEncoder) error) { wssValues := lfcMetrics.ApproximateworkingSetSizeBuckets // At this point, we can assume that the values are equally spaced at 1 minute apart, // starting at 1 minute. @@ -93,7 +115,7 @@ func calculateLFCGoalCU( // Handle invalid metrics: if len(wssValues) < offsetIndex+windowSize { warn("not enough working set size values to make scaling determination") - return 0, nil + return 0, nil, nil } else { estimateWss := EstimateTrueWorkingSetSize(wssValues, WssEstimatorConfig{ MaxAllowedIncreaseFactor: 3.0, // hard-code this for now. @@ -107,10 +129,12 @@ func calculateLFCGoalCU( predictedHighestNextMinute := ProjectNextHighest(wssValues[:projectSliceEnd], projectLen) // predictedHighestNextMinute is still in units of 8KiB pages. Let's convert that - // into GiB, then convert that into CU, and then invert the discount from only some - // of the memory going towards LFC to get the actual CU required to fit the - // predicted working set size. - requiredCU := predictedHighestNextMinute * 8192 / computeUnit.Mem.AsFloat64() / *cfg.LFCToMemoryRatio + // into GiB... + estimateWssMem := predictedHighestNextMinute * 8192 + // ... and then invert the discount form only some of the memory going towards LFC... + requiredMem := estimateWssMem / *cfg.LFCToMemoryRatio + // ... and then convert that into the actual CU required to fit the working set: + requiredCU := requiredMem / computeUnit.Mem.AsFloat64() lfcGoalCU := uint32(math.Ceil(requiredCU)) lfcLogFields := func(obj zapcore.ObjectEncoder) error { @@ -120,6 +144,6 @@ func calculateLFCGoalCU( return nil } - return lfcGoalCU, lfcLogFields + return lfcGoalCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields } } diff --git a/pkg/agent/core/metrics.go b/pkg/agent/core/metrics.go index 4187f66b5..3cb80628b 100644 --- a/pkg/agent/core/metrics.go +++ b/pkg/agent/core/metrics.go @@ -18,8 +18,10 @@ import ( ) type SystemMetrics struct { - LoadAverage1Min float64 - MemoryUsageBytes float64 + LoadAverage1Min float64 + + MemoryUsageBytes float64 + MemoryCachedBytes float64 } func (m SystemMetrics) ToAPI() api.Metrics { @@ -94,11 +96,13 @@ func (m *SystemMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) e load1 := getFloat("host_load1") memTotal := getFloat("host_memory_total_bytes") memAvailable := getFloat("host_memory_available_bytes") + memCached := getFloat("host_memory_cached_bytes") tmp := SystemMetrics{ LoadAverage1Min: load1, // Add an extra 100 MiB to account for kernel memory usage - MemoryUsageBytes: memTotal - memAvailable + 100*(1<<20), + MemoryUsageBytes: memTotal - memAvailable + 100*(1<<20), + MemoryCachedBytes: memCached, } if err := ec.Resolve(); err != nil { diff --git a/pkg/agent/core/state_test.go b/pkg/agent/core/state_test.go index 6d635c812..d975de870 100644 --- a/pkg/agent/core/state_test.go +++ b/pkg/agent/core/state_test.go @@ -40,8 +40,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { { name: "BasicScaleup", systemMetrics: &core.SystemMetrics{ - LoadAverage1Min: 0.30, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.30, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, }, lfcMetrics: nil, enableLFCMetrics: false, @@ -56,8 +57,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { { name: "MismatchedApprovedNoScaledown", systemMetrics: &core.SystemMetrics{ - LoadAverage1Min: 0.0, // ordinarily would like to scale down - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, // ordinarily would like to scale down + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, }, lfcMetrics: nil, enableLFCMetrics: false, @@ -74,8 +76,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { // ref https://github.com/neondatabase/autoscaling/issues/512 name: "MismatchedApprovedNoScaledownButVMAtMaximum", systemMetrics: &core.SystemMetrics{ - LoadAverage1Min: 0.0, // ordinarily would like to scale down - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, // ordinarily would like to scale down + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, }, lfcMetrics: nil, enableLFCMetrics: false, @@ -92,8 +95,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { { name: "BasicLFCScaleup", systemMetrics: &core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, }, lfcMetrics: &core.LFCMetrics{ CacheHitsTotal: 0.0, // unused @@ -126,8 +130,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { // still scale up if load average dictates it. name: "CanScaleUpWithoutExpectedLFCMetrics", systemMetrics: &core.SystemMetrics{ - LoadAverage1Min: 0.30, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.30, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, }, lfcMetrics: nil, enableLFCMetrics: true, @@ -146,8 +151,9 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { // bounds we can still action that. name: "CanScaleToBoundsWithoutExpectedMetrics", systemMetrics: &core.SystemMetrics{ - LoadAverage1Min: 0.30, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.30, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, }, lfcMetrics: nil, enableLFCMetrics: true, @@ -226,6 +232,7 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { DefaultScalingConfig: api.ScalingConfig{ LoadAverageFractionTarget: lo.ToPtr(0.5), MemoryUsageFractionTarget: lo.ToPtr(0.5), + MemoryTotalFractionTarget: lo.ToPtr(0.9), EnableLFCMetrics: lo.ToPtr(enableLFCMetrics), LFCToMemoryRatio: lo.ToPtr(0.75), LFCWindowSizeMinutes: lo.ToPtr(5), @@ -313,6 +320,7 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{ DefaultScalingConfig: api.ScalingConfig{ LoadAverageFractionTarget: lo.ToPtr(0.5), MemoryUsageFractionTarget: lo.ToPtr(0.5), + MemoryTotalFractionTarget: lo.ToPtr(0.9), EnableLFCMetrics: lo.ToPtr(false), LFCToMemoryRatio: lo.ToPtr(0.75), LFCWindowSizeMinutes: lo.ToPtr(5), @@ -441,8 +449,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { // Set metrics clockTick().AssertEquals(duration("0.2s")) lastMetrics := core.SystemMetrics{ - LoadAverage1Min: 0.3, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.3, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, lastMetrics) // double-check that we agree about the desired resources @@ -539,8 +548,9 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) { // Set metrics back so that desired resources should now be zero lastMetrics = core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, lastMetrics) // double-check that we agree about the new desired resources @@ -641,8 +651,9 @@ func TestPeriodicPluginRequest(t *testing.T) { state.Monitor().Active(true) metrics := core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } resources := DefaultComputeUnit @@ -730,8 +741,9 @@ func TestPartialUpscaleThenFull(t *testing.T) { // Set metrics clockTick() metrics := core.SystemMetrics{ - LoadAverage1Min: 1.0, - MemoryUsageBytes: 12345678, + LoadAverage1Min: 1.0, + MemoryUsageBytes: 12345678, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, metrics) @@ -873,8 +885,9 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) { // Set metrics clockTick() metrics := core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, metrics) // double-check that we agree about the desired resources @@ -1148,8 +1161,9 @@ func TestRequestedUpscale(t *testing.T) { // Set metrics clockTick() lastMetrics := core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, lastMetrics) @@ -1290,12 +1304,14 @@ func TestDownscalePivotBack(t *testing.T) { } initialMetrics := core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.0, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } newMetrics := core.SystemMetrics{ - LoadAverage1Min: 0.3, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.3, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } steps := []struct { @@ -1531,8 +1547,9 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) { // Set metrics so the desired resources are still 2 CU metrics := core.SystemMetrics{ - LoadAverage1Min: 0.3, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.3, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, metrics) // Check that we agree about desired resources @@ -1645,8 +1662,9 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) { // Set metrics so the desired resources are still 2 CU metrics := core.SystemMetrics{ - LoadAverage1Min: 0.3, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.3, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, metrics) // Check that we agree about desired resources @@ -1744,8 +1762,9 @@ func TestFailedRequestRetry(t *testing.T) { // Set metrics so that we should be trying to upscale clockTick() metrics := core.SystemMetrics{ - LoadAverage1Min: 0.3, - MemoryUsageBytes: 0.0, + LoadAverage1Min: 0.3, + MemoryUsageBytes: 0.0, + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, metrics) @@ -1900,8 +1919,9 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) { clockTick() // the actual metrics we got in the actual logs metrics := core.SystemMetrics{ - LoadAverage1Min: 0.0, - MemoryUsageBytes: 150589570, // 143.6 MiB + LoadAverage1Min: 0.0, + MemoryUsageBytes: 150589570, // 143.6 MiB + MemoryCachedBytes: 0.0, } a.Do(state.UpdateSystemMetrics, metrics) diff --git a/pkg/api/vminfo.go b/pkg/api/vminfo.go index a26ad30ff..e50c5f303 100644 --- a/pkg/api/vminfo.go +++ b/pkg/api/vminfo.go @@ -328,14 +328,24 @@ type ScalingConfig struct { // this field is left out the settings will fall back on the global default. LoadAverageFractionTarget *float64 `json:"loadAverageFractionTarget,omitempty"` - // MemoryUsageFractionTarget sets the desired fraction of current memory that - // we would like to be using. For example, with a value of 0.7, on a 4GB VM - // we'd like to be using 2.8GB of memory. + // MemoryUsageFractionTarget sets the maximum fraction of total memory that postgres allocations + // (MemoryUsage) must fit into. This doesn't count the LFC memory. + // This memory may also be viewed as "unreclaimable" (contrary to e.g. page cache). + // + // For example, with a value of 0.75 on a 4GiB VM, we will try to upscale if the unreclaimable + // memory usage exceeds 3GiB. // // When specifying the autoscaler-agent config, this field is required. For an individual VM, if // this field is left out the settings will fall back on the global default. MemoryUsageFractionTarget *float64 `json:"memoryUsageFractionTarget,omitempty"` + // MemoryTotalFractionTarget sets the maximum fraction of total memory that postgres allocations + // PLUS LFC memory (MemoryUsage + MemoryCached) must fit into. + // + // Compared with MemoryUsageFractionTarget, this value can be set higher (e.g. 0.9 vs 0.75), + // because we can tolerate higher fraction of consumption for both in-VM memory consumers. + MemoryTotalFractionTarget *float64 `json:"memoryTotalFractionTarget,omitempty"` + // EnableLFCMetrics, if true, enables fetching additional metrics about the Local File Cache // (LFC) to provide as input to the scaling algorithm. // @@ -376,6 +386,9 @@ func (defaults ScalingConfig) WithOverrides(overrides *ScalingConfig) ScalingCon if overrides.MemoryUsageFractionTarget != nil { defaults.MemoryUsageFractionTarget = lo.ToPtr(*overrides.MemoryUsageFractionTarget) } + if overrides.MemoryTotalFractionTarget != nil { + defaults.MemoryTotalFractionTarget = lo.ToPtr(*overrides.MemoryTotalFractionTarget) + } if overrides.EnableLFCMetrics != nil { defaults.EnableLFCMetrics = lo.ToPtr(*overrides.EnableLFCMetrics) } @@ -428,6 +441,13 @@ func (c *ScalingConfig) validate(requireAll bool) error { } else if requireAll { ec.Add(fmt.Errorf("%s is a required field", ".memoryUsageFractionTarget")) } + // Make sure c.MemoryTotalFractionTarget is between 0 and 1 + if c.MemoryTotalFractionTarget != nil { + erc.Whenf(ec, *c.MemoryTotalFractionTarget < 0.0, "%s must be set to value >= 0", ".memoryTotalFractionTarget") + erc.Whenf(ec, *c.MemoryTotalFractionTarget >= 1.0, "%s must be set to value < 1 ", ".memoryTotalFractionTarget") + } else if requireAll { + ec.Add(fmt.Errorf("%s is a required field", ".memoryTotalFractionTarget")) + } if requireAll { erc.Whenf(ec, c.EnableLFCMetrics == nil, "%s is a required field", ".enableLFCMetrics")