diff --git a/autoscaler-agent/config_map.yaml b/autoscaler-agent/config_map.yaml index b22c30d6b..727004521 100644 --- a/autoscaler-agent/config_map.yaml +++ b/autoscaler-agent/config_map.yaml @@ -26,6 +26,13 @@ data: "accumulateEverySeconds": 24, "clients": {} }, + "scalingEvents": { + "cuMultiplier": 0.25, + "rereportThreshold": 0.25, + "clusterName": "replaceme", + "regionName": "replaceme", + "clients": {} + }, "monitor": { "serverPort": 10301, "responseTimeoutSeconds": 5, diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 254e7f6ae..93d219225 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -8,6 +8,7 @@ import ( "github.com/tychoish/fun/erc" "github.com/neondatabase/autoscaling/pkg/agent/billing" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/reporting" ) @@ -15,12 +16,14 @@ import ( type Config struct { RefreshStateIntervalSeconds uint `json:"refereshStateIntervalSeconds"` + Billing billing.Config `json:"billing"` + ScalingEvents scalingevents.Config `json:"scalingEvents"` + Scaling ScalingConfig `json:"scaling"` Metrics MetricsConfig `json:"metrics"` Scheduler SchedulerConfig `json:"scheduler"` Monitor MonitorConfig `json:"monitor"` NeonVM NeonVMConfig `json:"neonvm"` - Billing billing.Config `json:"billing"` DumpState *DumpStateConfig `json:"dumpState"` } @@ -193,6 +196,18 @@ func (c *Config) validate() error { erc.Whenf(ec, c.Billing.Clients.S3.Region == "", emptyTmpl, ".billing.clients.s3.region") erc.Whenf(ec, c.Billing.Clients.S3.PrefixInBucket == "", emptyTmpl, ".billing.clients.s3.prefixInBucket") } + + erc.Whenf(ec, c.ScalingEvents.CUMultiplier == 0, zeroTmpl, ".scalingEvents.cuMultiplier") + erc.Whenf(ec, c.ScalingEvents.RereportThreshold == 0, zeroTmpl, ".scalingEvents.rereportThreshold") + erc.Whenf(ec, c.ScalingEvents.ClusterName == "", emptyTmpl, ".scalingEvents.clusterName") + erc.Whenf(ec, c.ScalingEvents.RegionName == "", emptyTmpl, ".scalingEvents.regionName") + if c.ScalingEvents.Clients.S3 != nil { + validateBaseReportingConfig(&c.ScalingEvents.Clients.S3.BaseClientConfig, "scalingEvents.clients.s3") + erc.Whenf(ec, c.ScalingEvents.Clients.S3.Bucket == "", emptyTmpl, ".scalingEvents.clients.s3.bucket") + erc.Whenf(ec, c.ScalingEvents.Clients.S3.Region == "", emptyTmpl, ".scalingEvents.clients.s3.region") + erc.Whenf(ec, c.ScalingEvents.Clients.S3.PrefixInBucket == "", emptyTmpl, ".scalingEvents.clients.s3.prefixInBucket") + } + erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port") erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds") diff --git a/pkg/agent/core/goalcu.go b/pkg/agent/core/goalcu.go index fd5785d79..8c733a66c 100644 --- a/pkg/agent/core/goalcu.go +++ b/pkg/agent/core/goalcu.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/api" ) @@ -19,6 +20,7 @@ type scalingGoal struct { func calculateGoalCU( warn func(string), + report func(goalCU uint32, parts scalingevents.GoalCUComponents), cfg api.ScalingConfig, computeUnit api.Resources, systemMetrics *SystemMetrics, @@ -29,14 +31,16 @@ func calculateGoalCU( warn("Making scaling decision without all required metrics available") } - var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU uint32 + var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU float64 var logFields []zap.Field + var reportedGoals scalingevents.GoalCUComponents var wss *api.Bytes // estimated working set size if lfcMetrics != nil { var lfcLogFunc func(zapcore.ObjectEncoder) error lfcGoalCU, wss, lfcLogFunc = calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics) + reportedGoals.LFC = lo.ToPtr(lfcGoalCU) if lfcLogFunc != nil { logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFunc))) } @@ -44,15 +48,27 @@ func calculateGoalCU( if systemMetrics != nil { cpuGoalCU = calculateCPUGoalCU(cfg, computeUnit, *systemMetrics) + reportedGoals.CPU = lo.ToPtr(cpuGoalCU) memGoalCU = calculateMemGoalCU(cfg, computeUnit, *systemMetrics) + reportedGoals.Mem = lo.ToPtr(memGoalCU) } if systemMetrics != nil && wss != nil { memTotalGoalCU = calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss) + reportedGoals.Mem = lo.ToPtr(max(*reportedGoals.Mem, memTotalGoalCU)) } - goalCU := max(cpuGoalCU, memGoalCU, memTotalGoalCU, lfcGoalCU) + goalCU := uint32(math.Ceil(max( + math.Round(cpuGoalCU), // for historical compatibility, use round() instead of ceil() + memGoalCU, + memTotalGoalCU, + lfcGoalCU, + ))) + if hasAllMetrics { + // Report this information, for scaling metrics. + report(goalCU, reportedGoals) + } return scalingGoal{hasAllMetrics: hasAllMetrics, goalCU: goalCU}, logFields } @@ -64,10 +80,9 @@ func calculateCPUGoalCU( cfg api.ScalingConfig, computeUnit api.Resources, systemMetrics SystemMetrics, -) uint32 { +) float64 { goalCPUs := systemMetrics.LoadAverage1Min / *cfg.LoadAverageFractionTarget - cpuGoalCU := uint32(math.Round(goalCPUs / computeUnit.VCPU.AsFloat64())) - return cpuGoalCU + return goalCPUs / computeUnit.VCPU.AsFloat64() } // For Mem: @@ -78,13 +93,11 @@ func calculateMemGoalCU( cfg api.ScalingConfig, computeUnit api.Resources, systemMetrics SystemMetrics, -) uint32 { +) float64 { // goal memory size, just looking at allocated memory (not including page cache...) - memGoalBytes := api.Bytes(math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget)) + memGoalBytes := math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget) - // note: this is equal to ceil(memGoalBytes / computeUnit.Mem), because ceil(X/M) == floor((X+M-1)/M) - memGoalCU := uint32((memGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem) - return memGoalCU + return memGoalBytes / float64(computeUnit.Mem) } // goal memory size, looking at allocated memory and min(page cache usage, LFC working set size) @@ -93,12 +106,11 @@ func calculateMemTotalGoalCU( computeUnit api.Resources, systemMetrics SystemMetrics, wss api.Bytes, -) uint32 { +) float64 { lfcCached := min(float64(wss), systemMetrics.MemoryCachedBytes) - totalGoalBytes := api.Bytes((lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget) + totalGoalBytes := (lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget - memTotalGoalCU := uint32((totalGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem) - return memTotalGoalCU + return totalGoalBytes / float64(computeUnit.Mem) } func calculateLFCGoalCU( @@ -106,7 +118,7 @@ func calculateLFCGoalCU( cfg api.ScalingConfig, computeUnit api.Resources, lfcMetrics LFCMetrics, -) (uint32, *api.Bytes, func(zapcore.ObjectEncoder) error) { +) (float64, *api.Bytes, func(zapcore.ObjectEncoder) error) { wssValues := lfcMetrics.ApproximateworkingSetSizeBuckets // At this point, we can assume that the values are equally spaced at 1 minute apart, // starting at 1 minute. @@ -135,7 +147,6 @@ func calculateLFCGoalCU( requiredMem := estimateWssMem / *cfg.LFCToMemoryRatio // ... and then convert that into the actual CU required to fit the working set: requiredCU := requiredMem / computeUnit.Mem.AsFloat64() - lfcGoalCU := uint32(math.Ceil(requiredCU)) lfcLogFields := func(obj zapcore.ObjectEncoder) error { obj.AddFloat64("estimateWssPages", estimateWss) @@ -144,6 +155,6 @@ func calculateLFCGoalCU( return nil } - return lfcGoalCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields + return requiredCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields } } diff --git a/pkg/agent/core/state.go b/pkg/agent/core/state.go index b7db040dd..114923694 100644 --- a/pkg/agent/core/state.go +++ b/pkg/agent/core/state.go @@ -31,6 +31,7 @@ import ( vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/api" ) @@ -38,8 +39,14 @@ type ObservabilityCallbacks struct { PluginLatency revsource.ObserveCallback MonitorLatency revsource.ObserveCallback NeonVMLatency revsource.ObserveCallback + + ScalingEvent ReportScalingEventCallback + DesiredScaling ReportDesiredScalingCallback } +type ReportScalingEventCallback func(timestamp time.Time, current uint32, target uint32) +type ReportDesiredScalingCallback func(timestamp time.Time, current uint32, target uint32, parts scalingevents.GoalCUComponents) + type RevisionSource interface { Next(ts time.Time, flags vmv1.Flag) vmv1.Revision Observe(moment time.Time, rev vmv1.Revision) error @@ -727,8 +734,20 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( // 2. Cap the goal CU by min/max, etc // 3. that's it! + reportGoals := func(goalCU uint32, parts scalingevents.GoalCUComponents) { + currentCU, ok := s.VM.Using().DivResources(s.Config.ComputeUnit) + if !ok { + return // skip reporting if the current CU is not right. + } + + if report := s.Config.ObservabilityCallbacks.DesiredScaling; report != nil { + report(now, uint32(currentCU), goalCU, parts) + } + } + sg, goalCULogFields := calculateGoalCU( s.warn, + reportGoals, s.scalingConfig(), s.Config.ComputeUnit, s.Metrics, @@ -1220,6 +1239,15 @@ func (s *State) NeonVM() NeonVMHandle { } func (h NeonVMHandle) StartingRequest(now time.Time, resources api.Resources) { + if report := h.s.Config.ObservabilityCallbacks.ScalingEvent; report != nil { + currentCU, currentOk := h.s.VM.Using().DivResources(h.s.Config.ComputeUnit) + targetCU, targetOk := resources.DivResources(h.s.Config.ComputeUnit) + + if currentOk && targetOk { + report(now, uint32(currentCU), uint32(targetCU)) + } + } + // FIXME: add time to ongoing request info (or maybe only in RequestFailed?) h.s.NeonVM.OngoingRequested = &resources } diff --git a/pkg/agent/core/state_test.go b/pkg/agent/core/state_test.go index d975de870..1e5b6cd06 100644 --- a/pkg/agent/core/state_test.go +++ b/pkg/agent/core/state_test.go @@ -222,6 +222,8 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { AlwaysMigrate: false, ScalingEnabled: true, ScalingConfig: nil, + ReportScalingEvents: false, + ReportDesiredScaling: false, }, CurrentRevision: nil, } @@ -257,6 +259,8 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { PluginLatency: nil, MonitorLatency: nil, NeonVMLatency: nil, + ScalingEvent: nil, + DesiredScaling: nil, }, } } @@ -342,6 +346,8 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{ PluginLatency: nil, MonitorLatency: nil, NeonVMLatency: nil, + ScalingEvent: nil, + DesiredScaling: nil, }, }, } diff --git a/pkg/agent/core/testhelpers/construct.go b/pkg/agent/core/testhelpers/construct.go index a97e8a10b..95a2a4b66 100644 --- a/pkg/agent/core/testhelpers/construct.go +++ b/pkg/agent/core/testhelpers/construct.go @@ -85,6 +85,8 @@ func CreateVmInfo(config InitialVmInfoConfig, opts ...VmInfoOpt) api.VmInfo { AlwaysMigrate: false, ScalingConfig: nil, ScalingEnabled: true, + ReportScalingEvents: false, + ReportDesiredScaling: false, }, CurrentRevision: nil, } diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 38e07cec2..ae315161f 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -11,6 +11,7 @@ import ( vmclient "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned" "github.com/neondatabase/autoscaling/pkg/agent/billing" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" @@ -51,7 +52,13 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } defer schedTracker.Stop() - globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker) + scalingEventsMetrics := scalingevents.NewPromMetrics() + scalingReporter, err := scalingevents.NewReporter(ctx, logger, &r.Config.ScalingEvents, scalingEventsMetrics) + if err != nil { + return fmt.Errorf("Error creating scaling events reporter: %w", err) + } + + globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker, scalingReporter) watchMetrics.MustRegister(globalPromReg) logger.Info("Starting billing metrics collector") diff --git a/pkg/agent/globalstate.go b/pkg/agent/globalstate.go index 3342a7cf7..69be386ce 100644 --- a/pkg/agent/globalstate.go +++ b/pkg/agent/globalstate.go @@ -17,6 +17,7 @@ import ( vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" vmclient "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -40,12 +41,15 @@ type agentState struct { vmClient *vmclient.Clientset schedTracker *schedwatch.SchedulerTracker metrics GlobalMetrics + + scalingReporter *scalingevents.Reporter } func (r MainRunner) newAgentState( baseLogger *zap.Logger, podIP string, schedTracker *schedwatch.SchedulerTracker, + scalingReporter *scalingevents.Reporter, ) (*agentState, *prometheus.Registry) { metrics, promReg := makeGlobalMetrics() @@ -59,6 +63,8 @@ func (r MainRunner) newAgentState( podIP: podIP, schedTracker: schedTracker, metrics: metrics, + + scalingReporter: scalingReporter, } return state, promReg diff --git a/pkg/agent/runner.go b/pkg/agent/runner.go index 7d0778922..ccdbf956a 100644 --- a/pkg/agent/runner.go +++ b/pkg/agent/runner.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "runtime/debug" "strconv" @@ -36,6 +37,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/agent/core" "github.com/neondatabase/autoscaling/pkg/agent/core/revsource" "github.com/neondatabase/autoscaling/pkg/agent/executor" + "github.com/neondatabase/autoscaling/pkg/agent/scalingevents" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -195,6 +197,11 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util if vmInfo.CurrentRevision != nil { initialRevision = vmInfo.CurrentRevision.Value } + // "dsrl" stands for "desired scaling report limiter" -- helper to avoid spamming events. + dsrl := &desiredScalingReportLimiter{ + lastTarget: nil, + lastParts: nil, + } revisionSource := revsource.NewRevisionSource(initialRevision, WrapHistogramVec(&r.global.metrics.scalingLatency)) executorCore := executor.NewExecutorCore(coreExecLogger, vmInfo, executor.Config{ OnNextActions: r.global.metrics.runnerNextActions.Inc, @@ -217,6 +224,10 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util PluginLatency: WrapHistogramVec(&r.global.metrics.pluginLatency), MonitorLatency: WrapHistogramVec(&r.global.metrics.monitorLatency), NeonVMLatency: WrapHistogramVec(&r.global.metrics.neonvmLatency), + ScalingEvent: r.reportScalingEvent, + DesiredScaling: func(ts time.Time, current, target uint32, parts scalingevents.GoalCUComponents) { + r.reportDesiredScaling(dsrl, ts, current, target, parts) + }, }, }, }) @@ -322,6 +333,102 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util } } +func (r *Runner) reportScalingEvent(timestamp time.Time, currentCU, targetCU uint32) { + var endpointID string + + enabled := func() bool { + r.status.mu.Lock() + defer r.status.mu.Unlock() + + endpointID = r.status.endpointID + return endpointID != "" && r.status.vmInfo.Config.ReportScalingEvents + }() + if !enabled { + return + } + + reporter := r.global.scalingReporter + reporter.Submit(reporter.NewRealEvent( + timestamp, + endpointID, + currentCU, + targetCU, + )) +} + +func (r *Runner) reportDesiredScaling( + rl *desiredScalingReportLimiter, + timestamp time.Time, + currentCU uint32, + targetCU uint32, + parts scalingevents.GoalCUComponents, +) { + var endpointID string + + enabled := func() bool { + r.status.mu.Lock() + defer r.status.mu.Unlock() + + endpointID = r.status.endpointID + return endpointID != "" && r.status.vmInfo.Config.ReportDesiredScaling + }() + if !enabled { + return + } + + // TODO: Use this opportunity to report the desired scaling in the per-VM + // metrics. + + rl.report(r.global.scalingReporter, timestamp, endpointID, currentCU, targetCU, parts) +} + +type desiredScalingReportLimiter struct { + lastTarget *uint32 + lastParts *scalingevents.GoalCUComponents +} + +func (rl *desiredScalingReportLimiter) report( + reporter *scalingevents.Reporter, + timestamp time.Time, + endpointID string, + currentCU uint32, + targetCU uint32, + parts scalingevents.GoalCUComponents, +) { + closeEnough := func(x *float64, y *float64) bool { + if (x != nil) != (y != nil) { + return false + } else if x == nil /* && y == nil */ { + return true + } else { + // true iff x and y are within the threshold of each other + return math.Abs(*x-*y) < 0.25 + } + } + + // Check if we should skip this time. + if rl.lastTarget != nil && rl.lastParts != nil { + skip := *rl.lastTarget == targetCU && + closeEnough(rl.lastParts.CPU, parts.CPU) && + closeEnough(rl.lastParts.Mem, parts.Mem) && + closeEnough(rl.lastParts.LFC, parts.LFC) + if skip { + return + } + } + + // Not skipping. + rl.lastTarget = &targetCU + rl.lastParts = &parts + reporter.Submit(reporter.NewHypotheticalEvent( + timestamp, + endpointID, + currentCU, + targetCU, + parts, + )) +} + ////////////////////// // Background tasks // ////////////////////// diff --git a/pkg/agent/scalingevents/clients.go b/pkg/agent/scalingevents/clients.go new file mode 100644 index 000000000..de1d8e98c --- /dev/null +++ b/pkg/agent/scalingevents/clients.go @@ -0,0 +1,64 @@ +package scalingevents + +import ( + "context" + "fmt" + "time" + + "github.com/lithammer/shortuuid" + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type ClientsConfig struct { + S3 *S3ClientConfig `json:"s3"` +} + +type S3ClientConfig struct { + reporting.BaseClientConfig + reporting.S3ClientConfig + PrefixInBucket string `json:"prefixInBucket"` +} + +type eventsClient = reporting.Client[ScalingEvent] + +func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]eventsClient, error) { + var clients []eventsClient + + if c := cfg.S3; c != nil { + generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket) + client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey) + if err != nil { + return nil, fmt.Errorf("error creating S3 client: %w", err) + } + logger.Info("Created S3 client for scaling events", zap.Any("config", c)) + + clients = append(clients, eventsClient{ + Name: "s3", + Base: client, + BaseConfig: c.BaseClientConfig, + GenerateTraceID: shortuuid.New, + SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch), + }) + } + + return clients, nil +} + +// Returns a function to generate keys for the placement of scaling events data into blob storage. +// +// Example: prefix/2024/10/31/23/events_{uuid}.ndjson.gz (11pm on halloween, UTC) +func newBlobStorageKeyGenerator(prefix string) func() string { + return func() string { + now := time.Now().UTC() + id := shortuuid.New() + + return fmt.Sprintf( + "%s/%d/%02d/%02d/%02d/events_%s.ndjson.gz", + prefix, + now.Year(), now.Month(), now.Day(), now.Hour(), + id, + ) + } +} diff --git a/pkg/agent/scalingevents/prommetrics.go b/pkg/agent/scalingevents/prommetrics.go new file mode 100644 index 000000000..d2b827803 --- /dev/null +++ b/pkg/agent/scalingevents/prommetrics.go @@ -0,0 +1,29 @@ +package scalingevents + +// Prometheus metrics for the agent's scaling event reporting subsystem + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type PromMetrics struct { + reporting *reporting.EventSinkMetrics + totalCount prometheus.Gauge +} + +func NewPromMetrics() PromMetrics { + return PromMetrics{ + reporting: reporting.NewEventSinkMetrics("autoscaling_agent_events"), + totalCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "autoscaling_agent_scaling_events_total", + Help: "Total number of scaling events generated", + }), + } +} + +func (m PromMetrics) MustRegister(reg *prometheus.Registry) { + m.reporting.MustRegister(reg) + reg.MustRegister(m.totalCount) +} diff --git a/pkg/agent/scalingevents/reporter.go b/pkg/agent/scalingevents/reporter.go new file mode 100644 index 000000000..a7120be6c --- /dev/null +++ b/pkg/agent/scalingevents/reporter.go @@ -0,0 +1,142 @@ +package scalingevents + +import ( + "context" + "math" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type Config struct { + // CUMultiplier sets the ratio between our internal compute unit and the one that should be + // reported. + // + // This exists because Neon allows fractional compute units, while the autoscaler-agent acts on + // integer multiples of a smaller compute unit. + CUMultiplier float64 `json:"cuMultiplier"` + + // RereportThreshold sets the minimum amount of change in desired compute units required for us to + // re-report the desired scaling. + RereportThreshold float64 `json:"rereportThreshold"` + + ClusterName string `json:"clusterName"` + RegionName string `json:"regionName"` + + Clients ClientsConfig `json:"clients"` +} + +type Reporter struct { + conf *Config + sink *reporting.EventSink[ScalingEvent] + metrics PromMetrics +} + +type ScalingEvent struct { + Timestamp time.Time `json:"timestamp"` + Region string `json:"region"` + Cluster string `json:"cluster"` + EndpointID string `json:"endpoint_id"` + Type scalingEventType `json:"type"` + CurrentMilliCU uint32 `json:"current_cu"` + TargetMilliCU uint32 `json:"target_cu"` + GoalComponents *GoalCUComponents `json:"goalComponents,omitempty"` +} + +type GoalCUComponents struct { + CPU *float64 `json:"cpu,omitempty"` + Mem *float64 `json:"mem,omitempty"` + LFC *float64 `json:"lfc,omitempty"` +} + +type scalingEventType string + +const ( + scalingEventReal = "real" + scalingEventHypothetical = "hypothetical" +) + +func NewReporter( + ctx context.Context, + parentLogger *zap.Logger, + conf *Config, + metrics PromMetrics, +) (*Reporter, error) { + logger := parentLogger.Named("scalingevents") + + clients, err := createClients(ctx, logger, conf.Clients) + if err != nil { + return nil, err + } + + sink := reporting.NewEventSink(logger, metrics.reporting, clients...) + + return &Reporter{ + conf: conf, + sink: sink, + metrics: metrics, + }, nil +} + +// Submit adds the ScalingEvent to the sender queue(s), returning without waiting for it to be sent. +func (r *Reporter) Submit(event ScalingEvent) { + r.sink.Enqueue(event) +} + +func convertToMilliCU(cu uint32, multiplier float64) uint32 { + return uint32(math.Round(1000 * float64(cu) * multiplier)) +} + +// NewRealEvent is a helper function to create a ScalingEvent for actual scaling that has occurred. +// +// This method also handles compute unit translation. +func (r *Reporter) NewRealEvent( + timestamp time.Time, + endpointID string, + currentCU uint32, + targetCU uint32, +) ScalingEvent { + return ScalingEvent{ + Timestamp: timestamp, + Region: r.conf.RegionName, + Cluster: r.conf.ClusterName, + EndpointID: endpointID, + Type: scalingEventReal, + CurrentMilliCU: convertToMilliCU(currentCU, r.conf.CUMultiplier), + TargetMilliCU: convertToMilliCU(targetCU, r.conf.CUMultiplier), + GoalComponents: nil, + } +} + +func (r *Reporter) NewHypotheticalEvent( + timestamp time.Time, + endpointID string, + currentCU uint32, + targetCU uint32, + goalCUs GoalCUComponents, +) ScalingEvent { + convertFloat := func(cu *float64) *float64 { + if cu != nil { + return lo.ToPtr(*cu * r.conf.CUMultiplier) + } + return nil + } + + return ScalingEvent{ + Timestamp: timestamp, + Region: r.conf.RegionName, + Cluster: r.conf.ClusterName, + EndpointID: endpointID, + Type: scalingEventHypothetical, + CurrentMilliCU: convertToMilliCU(currentCU, r.conf.CUMultiplier), + TargetMilliCU: convertToMilliCU(targetCU, r.conf.CUMultiplier), + GoalComponents: &GoalCUComponents{ + CPU: convertFloat(goalCUs.CPU), + Mem: convertFloat(goalCUs.Mem), + LFC: convertFloat(goalCUs.LFC), + }, + } +} diff --git a/pkg/api/types.go b/pkg/api/types.go index d71570def..9cf6158a4 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -381,6 +381,23 @@ func (r Resources) Mul(factor uint16) Resources { } } +// DivResources divides the resources by the smaller amount, returning the uint16 value such that +// other.Mul(factor) is equal to the original resources. +// +// If r is not an integer multiple of other, then (0, false) will be returned. +func (r Resources) DivResources(other Resources) (uint16, bool) { + cpuFactor := uint16(r.VCPU / other.VCPU) + cpuOk := r.VCPU%other.VCPU == 0 + memFactor := uint16(r.Mem / other.Mem) + memOk := r.Mem%other.Mem == 0 + + if !cpuOk || !memOk || cpuFactor != memFactor { + return 0, false + } + + return cpuFactor, true // already known equal to memFactor +} + // AbsDiff returns a new Resources with each field F as the absolute value of the difference between // r.F and cmp.F func (r Resources) AbsDiff(cmp Resources) Resources { diff --git a/pkg/api/vminfo.go b/pkg/api/vminfo.go index e50c5f303..9a936415b 100644 --- a/pkg/api/vminfo.go +++ b/pkg/api/vminfo.go @@ -26,6 +26,10 @@ const ( AnnotationAutoscalingBounds = "autoscaling.neon.tech/bounds" AnnotationAutoscalingConfig = "autoscaling.neon.tech/config" AnnotationBillingEndpointID = "autoscaling.neon.tech/billing-endpoint-id" + + // ref cloud#15939; to be removed after rollout is complete. + LabelReportScalingEvents = "autoscaling.neon.tech/report-scaling-events" + LabelReportDesiredScaling = "autoscaling.neon.tech/report-desired-scaling" ) func hasTrueLabel(obj metav1.ObjectMetaAccessor, labelName string) bool { @@ -49,6 +53,14 @@ func HasAlwaysMigrateLabel(obj metav1.ObjectMetaAccessor) bool { return hasTrueLabel(obj, LabelTestingOnlyAlwaysMigrate) } +func HasReportScalingEventsLabel(obj metav1.ObjectMetaAccessor) bool { + return hasTrueLabel(obj, LabelReportScalingEvents) +} + +func HasReportDesiredScalingLabel(obj metav1.ObjectMetaAccessor) bool { + return hasTrueLabel(obj, LabelReportDesiredScaling) +} + // VmInfo is the subset of vmapi.VirtualMachineSpec that the scheduler plugin and autoscaler agent // care about. It takes various labels and annotations into account, so certain fields might be // different from what's strictly in the VirtualMachine object. @@ -113,6 +125,9 @@ type VmConfig struct { AlwaysMigrate bool `json:"alwaysMigrate"` ScalingEnabled bool `json:"scalingEnabled"` ScalingConfig *ScalingConfig `json:"scalingConfig,omitempty"` + + ReportScalingEvents bool `json:"reportScalingEvents"` + ReportDesiredScaling bool `json:"reportDesiredScaling"` } // Using returns the Resources that this VmInfo says the VM is using @@ -186,6 +201,8 @@ func extractVmInfoGeneric( autoMigrationEnabled := HasAutoMigrationEnabled(obj) scalingEnabled := HasAutoscalingEnabled(obj) alwaysMigrate := HasAlwaysMigrateLabel(obj) + reportScalingEvents := HasReportScalingEventsLabel(obj) + reportDesiredScaling := HasReportDesiredScalingLabel(obj) info := VmInfo{ Name: vmName, @@ -197,6 +214,9 @@ func extractVmInfoGeneric( AlwaysMigrate: alwaysMigrate, ScalingEnabled: scalingEnabled, ScalingConfig: nil, // set below, maybe + + ReportScalingEvents: reportScalingEvents, + ReportDesiredScaling: reportDesiredScaling, }, CurrentRevision: nil, // set later, maybe }