diff --git a/ARCHITECTURE-network-diagram.org b/ARCHITECTURE-network-diagram.org index 335dcb68a..9c565a8a8 100644 --- a/ARCHITECTURE-network-diagram.org +++ b/ARCHITECTURE-network-diagram.org @@ -32,29 +32,29 @@ awk '/#\+BEGIN_SRC/{flag=1;next}/#\+END_SRC/{flag=0}flag' ARCHITECTURE-network-d | +---------| autoscaler agent | | | | | | (one per K8s node) | - | +-----------------*--+ - | | | ^ 10302 - | | | | - | | | | -+=================|==================================|===========|===|======+ -: K8s pod | | | | : -: QMP | | | | : -: 20183 V | | | : -: +---------------*----------------------------------|-----------|---|---+ : -: | | | | | : -: | QEMU process | | | | : -: | | | | | : -: | | | | | : -: | compute_ctl postgres metrics | informant | | | : -: | mgmt API postgres prometheus | informant | | | : -: | 3080 5432 9100 V 10301 V | | : + | +--------------------+ + | | | + | | | + | | | ++=================|==================================|===========|==========+ +: K8s pod | | | : +: QMP | | | : +: 20183 V | | : +: +---------------*----------------------------------|-----------|-------+ : +: | | | | : +: | QEMU process | | | : +: | | | | : +: | | | | : +: | compute_ctl postgres metrics | monitor | | : +: | mgmt API postgres prometheus | websocket | | : +: | 3080 5432 9100 V 10301 V | : : +------------------------*-----------*-------------*-----------*-------+ : : | VM | : : | | : : | Inside the VM runs: | : : | - compute_ctl (listens on port 3080) | : +: | - VM monitor (port 10301 via websocket) | : : | - Postgres (port 5432) | : -: | - VM informant (port 10301) | : : | - vector (metrics on port 9100) | : : | | : : +----------------------------------------------------------------------+ : diff --git a/ARCHITECTURE-network-diagram.png b/ARCHITECTURE-network-diagram.png index febd8cd59..e58c91c98 100644 Binary files a/ARCHITECTURE-network-diagram.png and b/ARCHITECTURE-network-diagram.png differ diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 976bd241e..337c5a2e5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -49,10 +49,10 @@ A third component, a binary running inside of the VM to (a) handle being upscale The scheduler plugin is responsible for handling resource requests from the `autoscaler-agent`, capping increases so that node resources aren't overcommitted. -The `autoscaler-agent` periodically reads from a metrics source in the VM (defined by the -_informant_) and makes scaling decisions about the _desired_ resource allocation. It then -requests these resources from the scheduler plugin, and submits a patch request for its NeonVM to -update the resources. +The `autoscaler-agent` periodically reads from a metrics source in the VM (currently vector's +`node_exporter`-like functionality) and makes scaling decisions about the _desired_ resource +allocation. It then requests these resources from the scheduler plugin, and submits a patch request +for its NeonVM to update the resources. The VM monitor is responsible for handling all of the resource management functionality inside the VM that the `autoscaler-agent` cannot. This constitutes handling upscales (eg. increasing Postgres diff --git a/deploy/agent/config_map.yaml b/deploy/agent/config_map.yaml index 3e9d05700..5b57cc53e 100644 --- a/deploy/agent/config_map.yaml +++ b/deploy/agent/config_map.yaml @@ -13,23 +13,17 @@ data: "memoryUsageFractionTarget": 0.75 } }, - "informant": { - "serverPort": 10301, - "callbackPort": 10302, - "retryServerMinWaitSeconds": 5, - "retryServerNormalWaitSeconds": 5, - "registerRetrySeconds": 5, - "requestTimeoutSeconds": 1, - "registerTimeoutSeconds": 2, - "downscaleTimeoutSeconds": 2, - "unhealthyAfterSilenceDurationSeconds": 20, - "unhealthyStartupGracePeriodSeconds": 20 - }, "monitor": { + "serverPort": 10301, "responseTimeoutSeconds": 5, - "connectionTimeoutSeconds": 4 + "connectionTimeoutSeconds": 4, + "connectionRetryMinWaitSeconds": 5, + "unhealthyAfterSilenceDurationSeconds": 20, + "unhealthyStartupGracePeriodSeconds": 20, + "maxHealthCheckSequentialFailuresSeconds": 30 }, "metrics": { + "port": 9100, "loadMetricPrefix": "host_", "requestTimeoutSeconds": 2, "secondsBetweenRequests": 5 diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 49b2f97f4..ebd1a5108 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -13,7 +13,6 @@ import ( type Config struct { Scaling ScalingConfig `json:"scaling"` - Informant InformantConfig `json:"informant"` Metrics MetricsConfig `json:"metrics"` Scheduler SchedulerConfig `json:"scheduler"` Monitor MonitorConfig `json:"monitor"` @@ -26,6 +25,21 @@ type MonitorConfig struct { // ConnectionTimeoutSeconds gives how long we may take to connect to the // monitor before cancelling. ConnectionTimeoutSeconds uint `json:"connectionTimeoutSeconds"` + // ConnectionRetryMinWaitSeconds gives the minimum amount of time we must wait between attempts + // to connect to the vm-monitor, regardless of whether they're successful. + ConnectionRetryMinWaitSeconds uint `json:"connectionRetryMinWaitSeconds"` + // ServerPort is the port that the dispatcher serves from + ServerPort uint16 `json:"serverPort"` + // UnhealthyAfterSilenceDurationSeconds gives the duration, in seconds, after which failing to + // receive a successful request from the monitor indicates that it is probably unhealthy. + UnhealthyAfterSilenceDurationSeconds uint `json:"unhealthyAfterSilenceDurationSeconds"` + // UnhealthyStartupGracePeriodSeconds gives the duration, in seconds, after which we will no + // longer excuse total VM monitor failures - i.e. when unhealthyAfterSilenceDurationSeconds + // kicks in. + UnhealthyStartupGracePeriodSeconds uint `json:"unhealthyStartupGracePeriodSeconds"` + // MaxHealthCheckSequentialFailuresSeconds gives the duration, in seconds, after which we + // should restart the connection to the vm-monitor if health checks aren't succeeding. + MaxHealthCheckSequentialFailuresSeconds uint `json:"maxHealthCheckSequentialFailuresSeconds"` } // DumpStateConfig configures the endpoint to dump all internal state @@ -46,50 +60,10 @@ type ScalingConfig struct { DefaultConfig api.ScalingConfig `json:"defaultConfig"` } -type InformantConfig struct { - // ServerPort is the port that the VM informant serves from - ServerPort uint16 `json:"serverPort"` - - // CallbackPort is the port that the agent listens on for informant -> agent requests - CallbackPort int `json:"callbackPort"` - - // RetryServerMinWaitSeconds gives the minimum duration, in seconds, that we must wait between the - // start of one InformantServer and the next - // - // This "minimum wait" is only used when thethe - RetryServerMinWaitSeconds uint `json:"retryServerMinWaitSeconds"` - // RetryServerNormalWaitSeconds gives the typical duration, in seconds, that we wait between an - // InformantServer failing and our retry. - RetryServerNormalWaitSeconds uint `json:"retryServerNormalWaitSeconds"` - // RegisterRetrySeconds gives the duration, in seconds, to wait between retrying a failed - // register request. - RegisterRetrySeconds uint `json:"registerRetrySeconds"` - - // RequestTimeoutSeconds gives the timeout for any individual request to the informant, except - // for those with separately-defined values below. - RequestTimeoutSeconds uint `json:"requestTimeoutSeconds"` - // RegisterTimeoutSeconds gives the timeout duration, in seconds, for a register request. - // - // This is a separate field from RequestTimeoutSeconds because registering may require that the - // informant suspend a previous agent, which could take longer. - RegisterTimeoutSeconds uint `json:"registerTimeoutSeconds"` - // DownscaleTimeoutSeconds gives the timeout duration, in seconds, for a downscale request. - // - // This is a separate field from RequestTimeoutSeconds it's possible that downscaling may - // require some non-trivial work that we want to allow to complete. - DownscaleTimeoutSeconds uint `json:"downscaleTimeoutSeconds"` - - // UnhealthyAfterSilenceDurationSeconds gives the duration, in seconds, after which failing to - // receive a successful request from the informant indicates that it is probably unhealthy. - UnhealthyAfterSilenceDurationSeconds uint `json:"unhealthyAfterSilenceDurationSeconds"` - // UnhealthyStartupGracePeriodSeconds gives the duration, in seconds, after which we will no - // longer excuse total VM informant failures - i.e. when unhealthyAfterSilenceDurationSeconds - // kicks in. - UnhealthyStartupGracePeriodSeconds uint `json:"unhealthyStartupGracePeriodSeconds"` -} - // MetricsConfig defines a few parameters for metrics requests to the VM type MetricsConfig struct { + // Port is the port that VMs are expected to provide metrics on + Port uint16 `json:"port"` // LoadMetricPrefix is the prefix at the beginning of the load metrics that we use. For // node_exporter, this is "node_", and for vector it's "host_" LoadMetricPrefix string `json:"loadMetricPrefix"` @@ -152,21 +126,17 @@ func (c *Config) validate() error { erc.Whenf(ec, c.Billing != nil && c.Billing.URL == "", emptyTmpl, ".billing.url") erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port") erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds") - erc.Whenf(ec, c.Informant.DownscaleTimeoutSeconds == 0, zeroTmpl, ".informant.downscaleTimeoutSeconds") - erc.Whenf(ec, c.Informant.RegisterRetrySeconds == 0, zeroTmpl, ".informant.registerRetrySeconds") - erc.Whenf(ec, c.Informant.RegisterTimeoutSeconds == 0, zeroTmpl, ".informant.registerTimeoutSeconds") - erc.Whenf(ec, c.Informant.RequestTimeoutSeconds == 0, zeroTmpl, ".informant.requestTimeoutSeconds") - erc.Whenf(ec, c.Informant.RetryServerMinWaitSeconds == 0, zeroTmpl, ".informant.retryServerMinWaitSeconds") - erc.Whenf(ec, c.Informant.RetryServerNormalWaitSeconds == 0, zeroTmpl, ".informant.retryServerNormalWaitSeconds") - erc.Whenf(ec, c.Informant.ServerPort == 0, zeroTmpl, ".informant.serverPort") - erc.Whenf(ec, c.Informant.CallbackPort == 0, zeroTmpl, ".informant.callbackPort") - erc.Whenf(ec, c.Informant.UnhealthyAfterSilenceDurationSeconds == 0, zeroTmpl, ".informant.unhealthyAfterSilenceDurationSeconds") - erc.Whenf(ec, c.Informant.UnhealthyStartupGracePeriodSeconds == 0, zeroTmpl, ".informant.unhealthyStartupGracePeriodSeconds") + erc.Whenf(ec, c.Metrics.Port == 0, zeroTmpl, ".metrics.port") erc.Whenf(ec, c.Metrics.LoadMetricPrefix == "", emptyTmpl, ".metrics.loadMetricPrefix") erc.Whenf(ec, c.Metrics.SecondsBetweenRequests == 0, zeroTmpl, ".metrics.secondsBetweenRequests") erc.Whenf(ec, c.Scaling.RequestTimeoutSeconds == 0, zeroTmpl, ".scaling.requestTimeoutSeconds") erc.Whenf(ec, c.Monitor.ResponseTimeoutSeconds == 0, zeroTmpl, ".monitor.responseTimeoutSeconds") erc.Whenf(ec, c.Monitor.ConnectionTimeoutSeconds == 0, zeroTmpl, ".monitor.connectionTimeoutSeconds") + erc.Whenf(ec, c.Monitor.ConnectionRetryMinWaitSeconds == 0, zeroTmpl, ".monitor.connectionRetryMinWaitSeconds") + erc.Whenf(ec, c.Monitor.ServerPort == 0, zeroTmpl, ".monitor.serverPort") + erc.Whenf(ec, c.Monitor.UnhealthyAfterSilenceDurationSeconds == 0, zeroTmpl, ".monitor.unhealthyAfterSilenceDurationSeconds") + erc.Whenf(ec, c.Monitor.UnhealthyStartupGracePeriodSeconds == 0, zeroTmpl, ".monitor.unhealthyStartupGracePeriodSeconds") + erc.Whenf(ec, c.Monitor.MaxHealthCheckSequentialFailuresSeconds == 0, zeroTmpl, ".monitor.maxHealthCheckSequentialFailuresSeconds") // add all errors if there are any: https://github.com/neondatabase/autoscaling/pull/195#discussion_r1170893494 ec.Add(c.Scaling.DefaultConfig.Validate()) erc.Whenf(ec, c.Scheduler.RequestPort == 0, zeroTmpl, ".scheduler.requestPort") diff --git a/pkg/agent/dispatcher.go b/pkg/agent/dispatcher.go index e8e0b8ae0..8d2377735 100644 --- a/pkg/agent/dispatcher.go +++ b/pkg/agent/dispatcher.go @@ -48,12 +48,18 @@ type Dispatcher struct { // message and will send it down the SignalSender so the original sender can use it. waiters map[uint64]util.SignalSender[waiterResult] - // lock guards mutating the waiters field. conn, logger, and nextTransactionID - // are all thread safe. server and protoVersion are never modified. + // lock guards mutating the waiters, exitError, and (closing) exitSignal field. + // conn and lastTransactionID are all thread safe. + // runner, exit, and protoVersion are never modified. lock sync.Mutex - // The InformantServer that this dispatcher is part of - server *InformantServer + // The runner that this dispatcher is part of + runner *Runner + + exit func(status websocket.StatusCode, err error) + + exitError error + exitSignal chan struct{} // lastTransactionID is the last transaction id. When we need a new one // we simply bump it and take the new number. @@ -63,8 +69,6 @@ type Dispatcher struct { // odd ones. So generating a new value is done by adding 2. lastTransactionID atomic.Uint64 - logger *zap.Logger - protoVersion api.MonitorProtoVersion } @@ -73,36 +77,150 @@ type waiterResult struct { res *MonitorResult } -// Create a new Dispatcher, establishing a connection with the informant. -// Note that this does not immediately start the Dispatcher. Call Run() to start it. +// Create a new Dispatcher, establishing a connection with the vm-monitor and setting up all the +// background threads to manage the connection. func NewDispatcher( ctx context.Context, logger *zap.Logger, addr string, - parent *InformantServer, -) (*Dispatcher, error) { - // parent.runner, runner.global, and global.config are immutable so we don't - // need to acquire runner.lock here - ctx, cancel := context.WithTimeout( - ctx, - time.Second*time.Duration(parent.runner.global.config.Monitor.ConnectionTimeoutSeconds), - ) + runner *Runner, + sendUpscaleRequested util.CondChannelSender, +) (_finalDispatcher *Dispatcher, _ error) { + // Create a new root-level context for this Dispatcher so that we can cancel if need be + ctx, cancelRootContext := context.WithCancel(ctx) + defer func() { + // cancel on failure or panic + if _finalDispatcher == nil { + cancelRootContext() + } + }() + + connectTimeout := time.Second * time.Duration(runner.global.config.Monitor.ConnectionTimeoutSeconds) + conn, protoVersion, err := connectToMonitor(ctx, logger, addr, connectTimeout) + if err != nil { + return nil, err + } + + disp := &Dispatcher{ + conn: conn, + waiters: make(map[uint64]util.SignalSender[waiterResult]), + runner: runner, + lock: sync.Mutex{}, + exit: nil, // set below + exitError: nil, + exitSignal: make(chan struct{}), + lastTransactionID: atomic.Uint64{}, // Note: initialized to 0, so it's even, as required. + protoVersion: *protoVersion, + } + disp.exit = func(status websocket.StatusCode, err error) { + disp.lock.Lock() + defer disp.lock.Unlock() + + if disp.Exited() { + return + } + + close(disp.exitSignal) + disp.exitError = err + cancelRootContext() + + var closeReason string + if err != nil { + closeReason = err.Error() + } else { + closeReason = "normal exit" + } + + // Run the actual websocket closing in a separate goroutine so we don't block while holding + // the lock. It can take up to 10s to close: + // + // > [Close] will write a WebSocket close frame with a timeout of 5s and then wait 5s for + // > the peer to send a close frame. + // + // This *potentially* runs us into race issues, but those are probably less bad to deal + // with, tbh. + go disp.conn.Close(status, closeReason) + } + + go func() { + <-ctx.Done() + disp.exit(websocket.StatusNormalClosure, nil) + }() + + msgHandlerLogger := logger.Named("message-handler") + runner.spawnBackgroundWorker(ctx, msgHandlerLogger, "vm-monitor message handler", func(c context.Context, l *zap.Logger) { + disp.run(c, l, sendUpscaleRequested) + }) + runner.spawnBackgroundWorker(ctx, logger.Named("health-checks"), "vm-monitor health checks", func(ctx context.Context, logger *zap.Logger) { + timeout := time.Second * time.Duration(runner.global.config.Monitor.ResponseTimeoutSeconds) + // FIXME: make this duration configurable + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + // if we've had sequential failures for more than + var firstSequentialFailure *time.Time + continuedFailureAbortTimeout := time.Second * time.Duration(runner.global.config.Monitor.MaxHealthCheckSequentialFailuresSeconds) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + _, err := disp.Call(ctx, logger, timeout, "HealthCheck", api.HealthCheck{}) + if err != nil { + logger.Warn("vm-monitor health check failed", zap.Error(err)) + + if firstSequentialFailure == nil { + now := time.Now() + firstSequentialFailure = &now + } else if since := time.Since(*firstSequentialFailure); since > continuedFailureAbortTimeout { + err := fmt.Errorf("vm-monitor has been failing health checks for at least %s", continuedFailureAbortTimeout) + logger.Error(fmt.Sprintf("%s, triggering connection restart", err.Error())) + disp.exit(websocket.StatusInternalError, err) + } + } else { + // health check was successful, so reset the sequential failures count + firstSequentialFailure = nil + + runner.status.update(runner.global, func(s podStatus) podStatus { + now := time.Now() + s.lastSuccessfulMonitorComm = &now + return s + }) + } + } + }) + return disp, nil +} + +func connectToMonitor( + ctx context.Context, + logger *zap.Logger, + addr string, + timeout time.Duration, +) (_ *websocket.Conn, _ *api.MonitorProtoVersion, finalErr error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - logger.Info("connecting via websocket", zap.String("addr", addr)) + logger.Info("Connecting to vm-monitor via websocket", zap.String("addr", addr)) // We do not need to close the response body according to docs. // Doing so causes memory bugs. c, _, err := websocket.Dial(ctx, addr, nil) //nolint:bodyclose // see comment above if err != nil { - return nil, fmt.Errorf("error establishing websocket connection to %s: %w", addr, err) + return nil, nil, fmt.Errorf("error establishing websocket connection to %s: %w", addr, err) } // If we return early, make sure we close the websocket - connectionOk := false + var failureReason websocket.StatusCode defer func() { - if !connectionOk { - c.Close(websocket.StatusInternalError, "could not establish protocol") + if finalErr != nil { + if failureReason == 0 { + failureReason = websocket.StatusInternalError + } + c.Close(failureReason, finalErr.Error()) } }() @@ -110,52 +228,70 @@ func NewDispatcher( Min: MinMonitorProtocolVersion, Max: MaxMonitorProtocolVersion, } - logger.Info("sending protocol version range", zap.Any("range", versionRange)) + logger.Info("Sending protocol version range", zap.Any("range", versionRange)) // Figure out protocol version err = wsjson.Write(ctx, c, versionRange) if err != nil { - return nil, fmt.Errorf("error sending protocol range to monitor: %w", err) + return nil, nil, fmt.Errorf("error sending protocol range to monitor: %w", err) } - logger.Info("reading monitor version response") - var version api.MonitorProtocolResponse - err = wsjson.Read(ctx, c, &version) + logger.Info("Reading monitor version response") + var resp api.MonitorProtocolResponse + err = wsjson.Read(ctx, c, &resp) if err != nil { - logger.Error("failed to read monitor response", zap.Error(err)) - return nil, fmt.Errorf("error reading monitor response during protocol handshake: %w", err) + logger.Error("Failed to read monitor response", zap.Error(err)) + failureReason = websocket.StatusProtocolError + return nil, nil, fmt.Errorf("Error reading vm-monitor response during protocol handshake: %w", err) } - logger.Info("got monitor version response", zap.Any("response", version)) - if version.Error != nil { - return nil, fmt.Errorf("monitor returned error during protocol handshake: %q", *version.Error) + + logger.Info("Got monitor version response", zap.Any("response", resp)) + if resp.Error != nil { + logger.Error("Got error response from vm-monitor", zap.Any("response", resp), zap.String("error", *resp.Error)) + failureReason = websocket.StatusProtocolError + return nil, nil, fmt.Errorf("Monitor returned error during protocol handshake: %q", *resp.Error) } - logger.Info("negotiated protocol version with monitor", zap.String("version", version.Version.String())) - disp := &Dispatcher{ - conn: c, - waiters: make(map[uint64]util.SignalSender[waiterResult]), - lastTransactionID: atomic.Uint64{}, // Note: initialized to 0, so it's even, as required. - logger: logger.Named("dispatcher"), - protoVersion: version.Version, - server: parent, - lock: sync.Mutex{}, + logger.Info("negotiated protocol version with monitor", zap.Any("response", resp), zap.String("version", resp.Version.String())) + return c, &resp.Version, nil +} + +// ExitSignal returns a channel that is closed when the Dispatcher is no longer running +func (disp *Dispatcher) ExitSignal() <-chan struct{} { + return disp.exitSignal +} + +// Exited returns whether the Dispatcher is no longer running +// +// Exited will return true iff the channel returned by ExitSignal is closed. +func (disp *Dispatcher) Exited() bool { + select { + case <-disp.exitSignal: + return true + default: + return false } - connectionOk = true - return disp, nil +} + +// ExitError returns the error that caused the dispatcher to exit, if there was one +func (disp *Dispatcher) ExitError() error { + disp.lock.Lock() + defer disp.lock.Unlock() + return disp.exitError } // Send a message down the connection. Only call this method with types that -// SerializeInformantMessage can handle. -func (disp *Dispatcher) send(ctx context.Context, id uint64, message any) error { - data, err := api.SerializeInformantMessage(message, id) +// SerializeMonitorMessage can handle. +func (disp *Dispatcher) send(ctx context.Context, logger *zap.Logger, id uint64, message any) error { + data, err := api.SerializeMonitorMessage(message, id) if err != nil { return fmt.Errorf("error serializing message: %w", err) } // wsjson.Write serializes whatever is passed in, and go serializes []byte // by base64 encoding it, so use RawMessage to avoid serializing to []byte - // (done by SerializeInformantMessage), and then base64 encoding again + // (done by SerializeMonitorMessage), and then base64 encoding again raw := json.RawMessage(data) - disp.logger.Info("sending message to monitor", zap.ByteString("message", raw)) + logger.Info("sending message to monitor", zap.ByteString("message", raw)) return wsjson.Write(ctx, disp.conn, &raw) } @@ -175,9 +311,12 @@ func (disp *Dispatcher) unregisterWaiter(id uint64) { } // Make a request to the monitor and wait for a response. The value passed as message must be a -// valid value to send to the monitor. See the docs for SerializeInformantMessage for more. +// valid value to send to the monitor. See the docs for SerializeMonitorMessage for more. +// +// This function must NOT be called while holding disp.runner.lock. func (disp *Dispatcher) Call( ctx context.Context, + logger *zap.Logger, timeout time.Duration, messageType string, message any, @@ -187,15 +326,15 @@ func (disp *Dispatcher) Call( status := "internal error" defer func() { - disp.server.runner.global.metrics.informantRequestsOutbound.WithLabelValues(messageType, status).Inc() + disp.runner.global.metrics.monitorRequestsOutbound.WithLabelValues(messageType, status).Inc() }() // register the waiter *before* sending, so that we avoid a potential race where we'd get a // reply to the message before being ready to receive it. disp.registerWaiter(id, sender) - err := disp.send(ctx, id, message) + err := disp.send(ctx, logger, id, message) if err != nil { - disp.logger.Error("failed to send message", zap.Any("message", message), zap.Error(err)) + logger.Error("failed to send message", zap.Any("message", message), zap.Error(err)) disp.unregisterWaiter(id) status = "[error: failed to send]" return nil, err @@ -261,25 +400,25 @@ func (disp *Dispatcher) HandleMessage( // avoids this, and we manually deserialize later var message json.RawMessage if err := wsjson.Read(ctx, disp.conn, &message); err != nil { - return fmt.Errorf("error receiving message: %w", err) + return fmt.Errorf("Error receiving message: %w", err) } logger.Info("(pre-decoding): received a message", zap.ByteString("message", message)) var unstructured map[string]interface{} if err := json.Unmarshal(message, &unstructured); err != nil { - return fmt.Errorf("error deserializing message: %q", string(message)) + return fmt.Errorf("Error deserializing message: %q", string(message)) } typeStr, err := extractField[string](unstructured, "type") if err != nil { - return fmt.Errorf("error extracting 'type' field: %w", err) + return fmt.Errorf("Error extracting 'type' field: %w", err) } // go thinks all json numbers are float64 so we first deserialize to that to // avoid the type error, then cast to uint64 f, err := extractField[float64](unstructured, "id") if err != nil { - return fmt.Errorf("error extracting 'id field: %w", err) + return fmt.Errorf("Error extracting 'id field: %w", err) } id := uint64(*f) @@ -314,7 +453,7 @@ func (disp *Dispatcher) HandleMessage( // we had some error while handling the message with this ID, and there wasn't a // corresponding waiter. We should make note of this in the metrics: status := fmt.Sprintf("[error: %s]", rootErr) - disp.server.runner.global.metrics.informantRequestsInbound.WithLabelValues(*typeStr, status) + disp.runner.global.metrics.monitorRequestsInbound.WithLabelValues(*typeStr, status) } // resume panicking if we were before @@ -326,10 +465,11 @@ func (disp *Dispatcher) HandleMessage( // Helper function to handle common unmarshalling logic unmarshal := func(value any) error { if err := json.Unmarshal(message, value); err != nil { - rootErr = errors.New("failed unmarshaling JSON") - err := fmt.Errorf("error unmarshaling %s: %w", *typeStr, err) + rootErr = errors.New("Failed unmarshaling JSON") + err := fmt.Errorf("Error unmarshaling %s: %w", *typeStr, err) + logger.Error(rootErr.Error(), zap.Error(err)) // we're already on the error path anyways - _ = disp.send(ctx, id, api.InvalidMessage{Error: err.Error()}) + _ = disp.send(ctx, logger, id, api.InvalidMessage{Error: err.Error()}) return err } @@ -373,45 +513,45 @@ func (disp *Dispatcher) HandleMessage( if err := unmarshal(&warning); err != nil { return err } - disp.logger.Warn("received notification we sent an invalid message", zap.Any("warning", warning)) + logger.Warn("Received notification we sent an invalid message", zap.Any("warning", warning)) return nil default: - rootErr = errors.New("received unknown message type") + rootErr = errors.New("Received unknown message type") return disp.send( ctx, + logger, id, - api.InvalidMessage{Error: fmt.Sprintf("received message of unknown type: %q", *typeStr)}, + api.InvalidMessage{Error: fmt.Sprintf("Received message of unknown type: %q", *typeStr)}, ) } } // Long running function that orchestrates all requests/responses. -func (disp *Dispatcher) run(ctx context.Context) { - logger := disp.logger.Named("message-handler") - logger.Info("starting message handler") +func (disp *Dispatcher) run(ctx context.Context, logger *zap.Logger, upscaleRequester util.CondChannelSender) { + logger.Info("Starting message handler") // Utility for logging + returning an error when we get a message with an // id we're unaware of. Note: unknownMessage is not a message type. handleUnkownMessage := func(messageType string, id uint64) error { - fmtString := "received %s with id %d but no record of previous message with that id" + fmtString := "Received %s with id %d but no record of previous message with that id" msg := fmt.Sprintf(fmtString, messageType, id) logger.Warn(msg, zap.Uint64("id", id)) - return disp.send(ctx, id, api.InvalidMessage{Error: msg}) + return disp.send(ctx, logger, id, api.InvalidMessage{Error: msg}) } // Does not take a message id because we don't know when the agent will // upscale. The monitor will get the result back as a NotifyUpscale message // from us, with a new id. handleUpscaleRequest := func(req api.UpscaleRequest) { - disp.server.runner.lock.Lock() - defer disp.server.runner.lock.Unlock() + disp.runner.lock.Lock() + defer disp.runner.lock.Unlock() // TODO: it shouldn't be this function's responsibility to update metrics. defer func() { - disp.server.runner.global.metrics.informantRequestsInbound.WithLabelValues("UpscaleRequest", "ok") + disp.runner.global.metrics.monitorRequestsInbound.WithLabelValues("UpscaleRequest", "ok") }() - disp.server.upscaleRequested.Send() + upscaleRequester.Send() resourceReq := api.MoreResources{ Cpu: false, @@ -420,10 +560,10 @@ func (disp *Dispatcher) run(ctx context.Context) { logger.Info( "Updating requested upscale", - zap.Any("oldRequested", disp.server.runner.requestedUpscale), + zap.Any("oldRequested", disp.runner.requestedUpscale), zap.Any("newRequested", resourceReq), ) - disp.server.runner.requestedUpscale = resourceReq + disp.runner.requestedUpscale = resourceReq } handleUpscaleConfirmation := func(_ api.UpscaleConfirmation, id uint64) error { disp.lock.Lock() @@ -431,7 +571,7 @@ func (disp *Dispatcher) run(ctx context.Context) { sender, ok := disp.waiters[id] if ok { - logger.Info("monitor confirmed upscale", zap.Uint64("id", id)) + logger.Info("vm-monitor confirmed upscale", zap.Uint64("id", id)) sender.Send(waiterResult{ err: nil, res: &MonitorResult{ @@ -453,7 +593,7 @@ func (disp *Dispatcher) run(ctx context.Context) { sender, ok := disp.waiters[id] if ok { - logger.Info("monitor returned downscale result", zap.Uint64("id", id)) + logger.Info("vm-monitor returned downscale result", zap.Uint64("id", id), zap.Any("result", res)) sender.Send(waiterResult{ err: nil, res: &MonitorResult{ @@ -476,13 +616,13 @@ func (disp *Dispatcher) run(ctx context.Context) { sender, ok := disp.waiters[id] if ok { logger.Warn( - "monitor experienced an internal error", - zap.String("error", err.Error), + "vm-monitor experienced an internal error", zap.Uint64("id", id), + zap.String("error", err.Error), ) // Indicate to the receiver that an error occured sender.Send(waiterResult{ - err: errors.New("monitor internal error"), + err: errors.New("vm-monitor internal error"), res: nil, }) // Don't forget to delete the waiter @@ -498,7 +638,7 @@ func (disp *Dispatcher) run(ctx context.Context) { sender, ok := disp.waiters[id] if ok { - logger.Info("monitor responded to health check", zap.Uint64("id", id)) + logger.Info("vm-monitor responded to health check", zap.Uint64("id", id)) // Indicate to the receiver that an error occured sender.Send(waiterResult{ err: nil, @@ -525,28 +665,20 @@ func (disp *Dispatcher) run(ctx context.Context) { } for { - err := disp.HandleMessage( - ctx, - logger, - handlers, - ) + err := disp.HandleMessage(ctx, logger, handlers) if err != nil { if ctx.Err() != nil { // The context is already cancelled, so this error is mostly likely - // expected. For example, if the context is cancelled becaues the - // informant server exited, we should expect to fail to read off the - // connection, which is closed by the server exit. - logger.Warn("context is already cancelled, but received an error", zap.Error(err)) + // expected. For example, if the context is cancelled because the + // runner exited, we should expect to fail to read off the connection, + // which is closed by the server exit. + logger.Warn("Error handling message", zap.Error(err)) } else { - func() { - logger.Error("error handling message -> triggering informant server exit", zap.Error(err)) - disp.server.runner.lock.Lock() - defer disp.server.runner.lock.Unlock() - disp.server.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: false, - }) - }() + logger.Error("Error handling message, shutting down connection", zap.Error(err)) + err = fmt.Errorf("Error handling message: %w", err) + // note: in theory we *could* be more descriptive with these statuses, but the only + // consumer of this API is the vm-monitor, and it doesn't check those. + disp.exit(websocket.StatusInternalError, err) } return } diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 5e29ef7b4..c8f8dbed0 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -26,11 +26,6 @@ type MainRunner struct { } func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { - informantServer, err := StartHttpMuxServer(logger, r.Config.Informant.CallbackPort) - if err != nil { - return fmt.Errorf("Error starting muxed informant server: %w", err) - } - vmEventQueue := pubsub.NewUnlimitedQueue[vmEvent]() defer vmEventQueue.Close() pushToQueue := func(ev vmEvent) { @@ -60,7 +55,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } defer schedulerStore.Stop() - globalState, promReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, broker, schedulerStore, informantServer) + globalState, promReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, broker, schedulerStore) watchMetrics.MustRegister(promReg) if r.Config.Billing != nil { diff --git a/pkg/agent/globalstate.go b/pkg/agent/globalstate.go index c20cc27a5..3095948ca 100644 --- a/pkg/agent/globalstate.go +++ b/pkg/agent/globalstate.go @@ -33,8 +33,6 @@ type agentState struct { lock util.ChanMutex pods map[util.NamespacedName]*podState - informantMuxServer *HttpMuxServer - // A base logger to pass around, so we can recreate the logger for a Runner on restart, without // running the risk of leaking keys. baseLogger *zap.Logger @@ -53,12 +51,10 @@ func (r MainRunner) newAgentState( podIP string, broker *pubsub.Broker[schedwatch.WatchEvent], schedulerStore *watch.Store[corev1.Pod], - informantMuxServer *HttpMuxServer, ) (*agentState, *prometheus.Registry) { state := &agentState{ lock: util.NewChanMutex(), pods: make(map[util.NamespacedName]*podState), - informantMuxServer: informantMuxServer, baseLogger: baseLogger, config: r.Config, kubeClient: r.KubeClient, @@ -164,8 +160,8 @@ func (s *agentState) handleVMEventAdded( state: "", // Explicitly set state to empty so that the initial state update does no decrement stateUpdatedAt: now, - startTime: now, - lastSuccessfulInformantComm: nil, + startTime: now, + lastSuccessfulMonitorComm: nil, }, } @@ -363,12 +359,10 @@ func (s *agentState) newRunner(vmInfo api.VmInfo, podName util.NamespacedName, p lastMetrics: nil, scheduler: nil, - server: nil, - informant: nil, + monitor: nil, computeUnit: nil, lastApproved: nil, lastSchedulerError: nil, - lastInformantError: nil, backgroundWorkerCount: atomic.Int64{}, backgroundPanic: make(chan error), @@ -422,7 +416,7 @@ type podStatus struct { endState *podStatusEndState previousEndStates []podStatusEndState - lastSuccessfulInformantComm *time.Time + lastSuccessfulMonitorComm *time.Time // vmInfo stores the latest information about the VM, as given by the global VM watcher. // @@ -446,7 +440,7 @@ type podStatusDump struct { EndState *podStatusEndState `json:"endState"` PreviousEndStates []podStatusEndState `json:"previousEndStates"` - LastSuccessfulInformantComm *time.Time `json:"lastSuccessfulInformantComm"` + LastSuccessfulMonitorComm *time.Time `json:"lastSuccessfulMonitorComm"` VMInfo api.VmInfo `json:"vmInfo"` @@ -494,7 +488,7 @@ func (s *lockedPodStatus) update(global *agentState, with func(podStatus) podSta case podStatusExitPanicked: newState = runnerMetricStatePanicked } - } else if newStatus.informantStuckAt(global.config).Before(now) { + } else if newStatus.monitorStuckAt(global.config).Before(now) { newState = runnerMetricStateStuck } else { newState = runnerMetricStateOk @@ -521,12 +515,12 @@ func (s *lockedPodStatus) update(global *agentState, with func(podStatus) podSta s.podStatus = newStatus } -// informantStuckAt returns the time at which the Runner will be marked "stuck" -func (s podStatus) informantStuckAt(config *Config) time.Time { - startupGracePeriod := time.Second * time.Duration(config.Informant.UnhealthyStartupGracePeriodSeconds) - unhealthySilencePeriod := time.Second * time.Duration(config.Informant.UnhealthyAfterSilenceDurationSeconds) +// monitorStuckAt returns the time at which the Runner will be marked "stuck" +func (s podStatus) monitorStuckAt(config *Config) time.Time { + startupGracePeriod := time.Second * time.Duration(config.Monitor.UnhealthyStartupGracePeriodSeconds) + unhealthySilencePeriod := time.Second * time.Duration(config.Monitor.UnhealthyAfterSilenceDurationSeconds) - if s.lastSuccessfulInformantComm == nil { + if s.lastSuccessfulMonitorComm == nil { start := s.startTime // For endpoints, we should start the grace period from when the VM was *assigned* the @@ -537,14 +531,14 @@ func (s podStatus) informantStuckAt(config *Config) time.Time { return start.Add(startupGracePeriod) } else { - return s.lastSuccessfulInformantComm.Add(unhealthySilencePeriod) + return s.lastSuccessfulMonitorComm.Add(unhealthySilencePeriod) } } func (s *lockedPodStatus) periodicallyRefreshState(ctx context.Context, logger *zap.Logger, global *agentState) { maxUpdateSeconds := util.Min( - global.config.Informant.UnhealthyStartupGracePeriodSeconds, - global.config.Informant.UnhealthyAfterSilenceDurationSeconds, + global.config.Monitor.UnhealthyStartupGracePeriodSeconds, + global.config.Monitor.UnhealthyAfterSilenceDurationSeconds, ) // make maxTick a bit less than maxUpdateSeconds for the benefit of consistency and having // relatively frequent log messages if things are stuck. @@ -564,7 +558,7 @@ func (s *lockedPodStatus) periodicallyRefreshState(ctx context.Context, logger * // the next point in time at which the state might have changed, so that we minimize the // time between the VM meeting the conditions for being "stuck" and us recognizing it. s.update(global, func(stat podStatus) podStatus { - stuckAt := stat.informantStuckAt(global.config) + stuckAt := stat.monitorStuckAt(global.config) now := time.Now() if stuckAt.Before(now) && stat.state != runnerMetricStateErrored && stat.state != runnerMetricStatePanicked { if stat.endpointID != "" { @@ -607,6 +601,6 @@ func (s *lockedPodStatus) dump() podStatusDump { State: s.state, StateUpdatedAt: s.stateUpdatedAt, - LastSuccessfulInformantComm: s.lastSuccessfulInformantComm, + LastSuccessfulMonitorComm: s.lastSuccessfulMonitorComm, } } diff --git a/pkg/agent/httpmux.go b/pkg/agent/httpmux.go deleted file mode 100644 index ba308a18d..000000000 --- a/pkg/agent/httpmux.go +++ /dev/null @@ -1,132 +0,0 @@ -package agent - -import ( - "fmt" - "net" - "net/http" - "net/url" - "strings" - "sync" - - "go.uber.org/zap" -) - -// HttpMuxServer provides a way to multiplex multiple http.ServeMux -// instances on a single HTTP server, so that requests are routed to -// the correct ServeMux based on a prefix at the beginning of the URL -// path. -// -// For example, if you call RegisterMux("foo", myMux), then all requests to -// "/foo/*" are routed to 'myMux' instance. -// -// (Why is this needed? You could register all the routes directly in -// one giant http.ServeMux, but http.ServeMux doesn't provide any way -// to unregister patterns.) -type HttpMuxServer struct { - muxedServers map[string]*http.ServeMux - lock sync.Mutex // protects muxedServers - logger *zap.Logger -} - -// Implements http.Handler -func (s *HttpMuxServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.logger.Debug(fmt.Sprintf("received request: %s", r.URL.Path)) - - path, found := strings.CutPrefix(r.URL.Path, "/") - if !found { - w.WriteHeader(http.StatusBadRequest) - s.logger.Warn( - "invalid request (path doesn't start with '/')", - zap.String("path", r.URL.Path), - ) - _, _ = w.Write([]byte("missing /")) - return - } - - muxID, path, found := strings.Cut(path, "/") - if !found { - w.WriteHeader(http.StatusBadRequest) - s.logger.Warn( - "invalid request (no mux ID in path)", - zap.String("path", r.URL.Path), - ) - _, _ = w.Write([]byte("request must start with mux ID")) - return - } - - var subServer *http.ServeMux - func() { - s.lock.Lock() - defer s.lock.Unlock() - subServer, found = s.muxedServers[muxID] - }() - if !found { - w.WriteHeader(http.StatusNotAcceptable) - s.logger.Warn( - "could not route request, mux ID not found", - zap.String("path", r.URL.Path), - zap.String("muxID", muxID), - ) - _, _ = w.Write([]byte("mux ID not found")) - return - } - - // Change the path in the request, removing the muxID prefix. - // - // The documentation for ServeHTTP says that you should not - // modify the Request, so make a copy. - var newURL url.URL = *r.URL - newURL.Path = "/" + path - var newRequest http.Request = *r - newRequest.URL = &newURL - - subServer.ServeHTTP(w, &newRequest) -} - -func (s *HttpMuxServer) RegisterMux(muxID string, subServer *http.ServeMux) error { - s.lock.Lock() - defer s.lock.Unlock() - if s.muxedServers[muxID] != nil { - return fmt.Errorf("mux ID already in use") - } - s.muxedServers[muxID] = subServer - return nil -} - -func (s *HttpMuxServer) UnregisterMux(muxID string) { - s.lock.Lock() - defer s.lock.Unlock() - delete(s.muxedServers, muxID) -} - -// Create a new HttpMuxServer, listening on the given port -func StartHttpMuxServer( - logger *zap.Logger, - port int, -) (*HttpMuxServer, error) { - // Manually start the TCP listener so we can minimize errors in the background thread. - addr := net.TCPAddr{IP: net.IPv4zero, Port: port} - listener, err := net.ListenTCP("tcp", &addr) - if err != nil { - return nil, fmt.Errorf("Error binding to %v", addr) - } - - muxServer := HttpMuxServer{ - muxedServers: map[string]*http.ServeMux{}, - lock: sync.Mutex{}, - logger: logger.Named("mux-server"), - } - - httpServer := &http.Server{ - Handler: &muxServer, - } - - // Main thread running the server. - go func() { - err := httpServer.Serve(listener) - - // The Serve call should never return - panic(fmt.Errorf("muxed http server exited unexpectedly: %w", err)) - }() - return &muxServer, nil -} diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go deleted file mode 100644 index a76269aa0..000000000 --- a/pkg/agent/informant.go +++ /dev/null @@ -1,1244 +0,0 @@ -package agent - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "strconv" - "strings" - "time" - - "github.com/google/uuid" - "github.com/tychoish/fun/srv" - "go.uber.org/zap" - "nhooyr.io/websocket" - - "github.com/neondatabase/autoscaling/pkg/api" - "github.com/neondatabase/autoscaling/pkg/util" -) - -// The autoscaler-agent currently supports v1.0 to v2.0 of the agent<->informant protocol. -// -// If you update either of these values, make sure to also update VERSIONING.md. -const ( - MinInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV1_0 - MaxInformantProtocolVersion api.InformantProtoVersion = api.InformantProtoV2_0 -) - -type InformantServer struct { - // If the "informant" we're talking to is actually a monitor - informantIsMonitor bool - - // The dispatcher we're using to connect to the monitor if informantIsMonitor - // is true - dispatcher *Dispatcher - - // runner is the Runner currently responsible for this InformantServer. We must acquire its lock - // before making any updates to other fields of this struct - runner *Runner - - // desc is the AgentDesc describing this VM informant server. This field is immutable. - desc api.AgentDesc - - seqNum uint64 - // receivedIDCheck is true if the server has received at least one successful request at the /id - // endpoint by the expected IP address of the VM - // - // This field is used to check for protocol violations (i.e. responding to /register without - // checking with /id), and *may* help prevent certain IP-spoofing based attacks - although the - // security implications are entirely speculation. - receivedIDCheck bool - - // madeContact is true if any request to the VM informant could have interacted with it. - // - // If madeContact is false, then mode is guaranteed to be InformantServerUnconfirmed, so - // madeContact only needs to be set on /register requests (because all others require a - // successful register first). - // - // This field MUST NOT be updated without holding BOTH runner.lock and requestLock. - // - // This field MAY be read while holding EITHER runner.lock OR requestLock. - madeContact bool - - // protoVersion gives the version of the agent<->informant protocol currently in use, if the - // server has been confirmed. - // - // In other words, this field is not nil if and only if mode is not InformantServerUnconfirmed. - protoVersion *api.InformantProtoVersion - - // mode indicates whether the informant has marked the connection as resumed or not - // - // This field MUST NOT be updated without holding BOTH runner.lock AND requestLock. - // - // This field MAY be read while holding EITHER runner.lock OR requestLock. - mode InformantServerMode - - // updatedInformant is signalled once, when the InformantServer's register request completes, - // and the value of runner.informant is updated. - updatedInformant util.CondChannelSender - - // upscaleRequested is signalled whenever a valid request on /try-upscale is received, with at - // least one field set to true (i.e., at least one resource is being requested). - upscaleRequested util.CondChannelSender - - // requestLock guards requests to the VM informant to make sure that only one request is being - // made at a time. - // - // If both requestLock and runner.lock are required, then requestLock MUST be acquired before - // runner.lock. - requestLock util.ChanMutex - - // exitStatus holds some information about why the server exited - exitStatus *InformantServerExitStatus - - // exit signals that the server should shut down, and sets exitStatus to status. - // - // This function MUST be called while holding runner.lock. - exit func(status InformantServerExitStatus) -} - -type InformantServerMode string - -const ( - InformantServerUnconfirmed InformantServerMode = "unconfirmed" - InformantServerSuspended InformantServerMode = "suspended" - InformantServerRunning InformantServerMode = "running" -) - -// InformantServerState is the serializable state of the InformantServer, produced by calls to the -// Runner's State() method. -type InformantServerState struct { - Desc api.AgentDesc `json:"desc"` - SeqNum uint64 `json:"seqNum"` - ReceivedIDCheck bool `json:"receivedIDCheck"` - MadeContact bool `json:"madeContact"` - ProtoVersion *api.InformantProtoVersion `json:"protoVersion"` - Mode InformantServerMode `json:"mode"` - ExitStatus *InformantServerExitStatus `json:"exitStatus"` -} - -type InformantServerExitStatus struct { - // Err is the error, if any, that caused the server to exit. This is only non-nil when context - // used to start the server becomes canceled (i.e. the Runner is exiting). - Err error - // RetryShouldFix is true if simply retrying should resolve err. This is true when e.g. the - // informant responds with a 404 to a downscale or upscale request - it might've restarted, so - // we just need to re-register. - RetryShouldFix bool -} - -// NewInformantServer starts an InformantServer, returning it and a signal receiver that will be -// signalled when it exits. -func NewInformantServer( - ctx context.Context, - logger *zap.Logger, - runner *Runner, - updatedInformant util.CondChannelSender, - upscaleRequested util.CondChannelSender, -) (*InformantServer, util.SignalReceiver[struct{}], error) { - - // Generate a new random "mux ID" that is used to distinguish requests to - // the informant port that from this registration. - muxID := uuid.New() - serverAddr := fmt.Sprintf("%s:%d/%s", runner.global.podIP, runner.global.config.Informant.CallbackPort, muxID) - server := &InformantServer{ - informantIsMonitor: false, - dispatcher: nil, - runner: runner, - desc: api.AgentDesc{ - AgentID: uuid.New(), - ServerAddr: serverAddr, - MinProtoVersion: MinInformantProtocolVersion, - MaxProtoVersion: MaxInformantProtocolVersion, - }, - seqNum: 0, - receivedIDCheck: false, - madeContact: false, - protoVersion: nil, - mode: InformantServerUnconfirmed, - updatedInformant: updatedInformant, - upscaleRequested: upscaleRequested, - requestLock: util.NewChanMutex(), - exitStatus: nil, - exit: nil, // see below. - } - - logger = logger.With(zap.Object("server", server.desc)) - logger.Info("Starting Informant server") - - mux := http.NewServeMux() - util.AddHandler(logger, mux, "/id", http.MethodGet, "struct{}", server.handleID) - util.AddHandler(logger, mux, "/resume", http.MethodPost, "ResumeAgent", server.handleResume) - util.AddHandler(logger, mux, "/suspend", http.MethodPost, "SuspendAgent", server.handleSuspend) - util.AddHandler(logger, mux, "/try-upscale", http.MethodPost, "MoreResourcesRequest", server.handleTryUpscale) - - sendFinished, recvFinished := util.NewSingleSignalPair[struct{}]() - backgroundCtx, cancelBackground := context.WithCancel(ctx) - - // note: docs for server.exit guarantee this function is called while holding runner.lock. - server.exit = func(status InformantServerExitStatus) { - sendFinished.Send(struct{}{}) - cancelBackground() - - // Set server.exitStatus if isn't already - if server.exitStatus == nil { - server.exitStatus = &status - logFunc := logger.Warn - if status.RetryShouldFix { - logFunc = logger.Info - } - - logFunc("Informant server exiting", zap.Bool("retry", status.RetryShouldFix), zap.Error(status.Err)) - } - - if server.informantIsMonitor { - server.dispatcher.conn.Close(websocket.StatusInternalError, "informant exit") - logger.Info("Successfully closed websocket connection") - } else if server.madeContact { - // Stop accepting HTTP requests for this registration - runner.global.informantMuxServer.UnregisterMux(muxID.String()) - // only unregister the server if we could have plausibly contacted the informant - runner.spawnBackgroundWorker(srv.GetBaseContext(ctx), logger, "InformantServer unregister", func(_ context.Context, logger *zap.Logger) { - // we want shutdown to (potentially) live longer than the request which - // made it, but having a timeout is still good. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if err := server.unregisterFromInformant(ctx, logger); err != nil { - logger.Warn("Error unregistering", zap.Error(err)) - } - }) - } - } - - // Register with multiplexer, so that requests with this muxID are routed to - // this InformantServer instance. - if err := runner.global.informantMuxServer.RegisterMux(muxID.String(), mux); err != nil { - logger.Error("Could not register to muxed server", zap.Error(err)) - return nil, util.SignalReceiver[struct{}]{}, fmt.Errorf("Could not register informant service with the multiplexer: %w", err) - } - - // Deadlock checker for server.requestLock - // - // FIXME: make these timeouts/delays separately defined constants, or configurable - deadlockChecker := server.requestLock.DeadlockChecker(5*time.Second, time.Second) - runner.spawnBackgroundWorker(backgroundCtx, logger, "InformantServer deadlock checker", ignoreLogger(deadlockChecker)) - - // Thread waiting for the context to be canceled so we can use it to shut down the server - runner.spawnBackgroundWorker(ctx, logger, "InformantServer shutdown waiter", func(context.Context, *zap.Logger) { - // Wait until parent context OR server's context is done. - <-backgroundCtx.Done() - server.exit(InformantServerExitStatus{Err: nil, RetryShouldFix: false}) - }) - - runner.spawnBackgroundWorker(backgroundCtx, logger, "InformantServer health-checker", func(c context.Context, logger *zap.Logger) { - // FIXME: make this duration configurable - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-c.Done(): - return - case <-ticker.C: - } - - // Are we talking to a monitor? - var isMonitor bool - func() { - server.requestLock.Lock() - defer server.requestLock.Unlock() - if server.informantIsMonitor { - if err := server.MonitorHealthCheck(c, logger); err != nil { - logger.Warn("Monitor health check failed", zap.Error(err)) - } - isMonitor = true - } else { - isMonitor = false - } - }() - if isMonitor { - continue - } - - // Otherwise do normal informant health checks - var done bool - func() { - server.requestLock.Lock() - defer server.requestLock.Unlock() - - // If we've already registered with the informant, and it doesn't support health - // checks, exit. - if server.protoVersion != nil && !server.protoVersion.AllowsHealthCheck() { - logger.Info("Aborting future informant health checks because it does not support them") - done = true - return - } - - if _, err := server.HealthCheck(c, logger); err != nil { - logger.Warn("Informant health check failed", zap.Error(err)) - } - }() - if done { - return - } - } - }) - - return server, recvFinished, nil -} - -var ( - InformantServerAlreadyExitedError error = errors.New("Informant server has already exited") - InformantServerSuspendedError error = errors.New("Informant server is currently suspended") - InformantServerUnconfirmedError error = errors.New("Informant server has not yet been confirmed") - InformantServerNotCurrentError error = errors.New("Informant server has been replaced") -) - -// IsNormalInformantError returns true if the error is one of the "expected" errors that can occur -// in valid exchanges - due to unavoidable raciness or otherwise. -func IsNormalInformantError(err error) bool { - return errors.Is(err, InformantServerAlreadyExitedError) || - errors.Is(err, InformantServerSuspendedError) || - errors.Is(err, InformantServerUnconfirmedError) || - errors.Is(err, InformantServerNotCurrentError) -} - -// valid checks if the InformantServer is good to use for communication, returning an error if not -// -// This method can return errors for a number of unavoidably-racy protocol states - errors from this -// method should be handled as unusual, but not unexpected. Any error returned will be one of -// InformantServer{AlreadyExited,Suspended,Confirmed}Error. -// -// This method MUST be called while holding s.runner.lock. -func (s *InformantServer) valid() error { - if s.exitStatus != nil { - return InformantServerAlreadyExitedError - } - - switch s.mode { - case InformantServerRunning: - // all good; one more check - case InformantServerUnconfirmed: - return InformantServerUnconfirmedError - case InformantServerSuspended: - return InformantServerSuspendedError - default: - panic(fmt.Errorf("Unexpected InformantServerMode %q", s.mode)) - } - - if s.runner.server != s { - return InformantServerNotCurrentError - } - return nil -} - -// ExitStatus returns the InformantServerExitStatus associated with the server, if it has been -// instructed to exit -// -// This method MUST NOT be called while holding s.runner.lock. -func (s *InformantServer) ExitStatus() *InformantServerExitStatus { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - return s.exitStatus -} - -// setLastInformantError is a helper method to abbreviate setting the Runner's lastInformantError -// field. If runnerLocked is true, s.runner.lock will be acquired. -// -// This method MUST be called while holding s.requestLock AND EITHER holding s.runner.lock OR -// runnerLocked MUST be true. -func (s *InformantServer) setLastInformantError(err error, runnerLocked bool) { - if !runnerLocked { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - } - - if s.runner.server == s { - s.runner.lastInformantError = err - } -} - -// RegisterWithInformant sends a /register request to the VM Informant -// -// If called after a prior success, this method will panic. If the server has already exited, this -// method will return InformantServerAlreadyExitedError. -// -// On certain errors, this method will force the server to exit. This can be checked by calling -// s.ExitStatus() and checking for a non-nil result. -// -// This method MUST NOT be called while holding s.requestLock OR s.runner.lock. -func (s *InformantServer) RegisterWithInformant(ctx context.Context, logger *zap.Logger) error { - logger = logger.With(zap.Object("server", s.desc)) - - s.requestLock.Lock() - defer s.requestLock.Unlock() - - // Check the current state: - err := func() error { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - switch s.mode { - case InformantServerUnconfirmed: - // good; this is what we're expecting - case InformantServerRunning, InformantServerSuspended: - panic(fmt.Errorf("Register called while InformantServer is already registered (mode = %q)", s.mode)) - default: - panic(fmt.Errorf("Unexpected InformantServerMode %q", s.mode)) - } - - if s.exitStatus != nil { - err := InformantServerAlreadyExitedError - s.setLastInformantError(err, true) - return err - } - - return nil - }() - if err != nil { - return err - } - - // Make the request: - timeout := time.Second * time.Duration(s.runner.global.config.Informant.RegisterTimeoutSeconds) - resp, statusCode, err := doInformantRequest[api.AgentDesc, api.InformantDesc]( - ctx, logger, s, timeout, http.MethodPost, "/register", &s.desc, - ) - - connectedToMonitor := false - maybeMadeContact := statusCode != 0 || ctx.Err() != nil - - if statusCode == 404 { - addr := fmt.Sprintf( - "ws://%s:%d/monitor", - s.runner.podIP, - s.runner.global.config.Informant.ServerPort, - ) - logger.Info( - "received 404 from informant's /register endpoint; connecting to monitor", - zap.String("addr", addr), - ) - // pre-declare disp so that err get's assigned to err from enclosing scope, - // overwriting original request error. - var disp *Dispatcher - disp, err = NewDispatcher(ctx, logger, addr, s) - // If the error is not nil, it will get handled below - if err == nil { - // Acquire the lock late so as not to hold it will connecting to the - // dispatcher - func() { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - connectedToMonitor = true - s.informantIsMonitor = true - s.dispatcher = disp - s.mode = InformantServerRunning - s.updatedInformant.Send() - if s.runner.server == s { - s.runner.informant = &api.InformantDesc{ - ProtoVersion: MaxInformantProtocolVersion, - MetricsMethod: api.InformantMetricsMethod{ - Prometheus: &api.MetricsMethodPrometheus{ - Port: 9100, - }, - }, - } - s.runner.spawnBackgroundWorker( - ctx, - disp.logger, - "dispatcher message handler", - ignoreLogger(disp.run), - ) - } else if s.exitStatus != nil { - // we exited -> close ws - disp.conn.Close(websocket.StatusInternalError, "informant exited") - - } else { - // updated -> close ws - disp.conn.Close( - websocket.StatusInternalError, - "runner is talking to a different informant", - ) - } - }() - } - } - - // Do more stuff with the lock acquired: - func() { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - // Record whether we might've contacted the informant: - s.madeContact = maybeMadeContact - - if err != nil { - s.setLastInformantError(fmt.Errorf("Register request failed: %w", err), true) - - // If the informant responds that it's our fault, or it had an internal failure, we know - // that: - // 1. Neither should happen under normal operation, and - // 2. Restarting the server is *more likely* to fix it than continuing - // We shouldn't *assume* that restarting will actually fix it though, so we'll still set - // RetryShouldFix = false. - if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: false, - }) - } - } - }() - - if connectedToMonitor { - return nil - } - - if err != nil { - return err // the errors returned by doInformantRequest are descriptive enough. - } - - if err := validateInformantDesc(&s.desc, resp); err != nil { - err = fmt.Errorf("Received bad InformantDesc: %w", err) - s.setLastInformantError(err, false) - return err - } - - // Now that we know it's valid, set s.runner.informant ... - err = func() error { - // ... but only if the server is still current. We're ok setting it if the server isn't - // running, because it's good to have the information there. - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - logger.Info( - "Informant server mode updated", - zap.String("action", "register"), - zap.String("oldMode", string(s.mode)), - zap.String("newMode", string(InformantServerSuspended)), - ) - - s.mode = InformantServerSuspended - s.protoVersion = &resp.ProtoVersion - - if s.runner.server == s { - oldInformant := s.runner.informant - s.runner.informant = resp - s.updatedInformant.Send() // signal we've changed the informant - - if oldInformant == nil { - logger.Info("Registered with informant", zap.Any("informant", *resp)) - } else if *oldInformant != *resp { - logger.Info( - "Re-registered with informant, InformantDesc changed", - zap.Any("oldInformant", *oldInformant), - zap.Any("informant", *resp), - ) - } else { - logger.Info("Re-registered with informant; InformantDesc unchanged", zap.Any("informant", *oldInformant)) - } - } else { - logger.Warn("Registering with informant completed but the server has already been replaced") - } - - // we also want to do a quick protocol check here as well - if !s.receivedIDCheck { - // protocol violation - err := errors.New("Informant responded to /register with 200 without requesting /id") - s.setLastInformantError(fmt.Errorf("Protocol violation: %w", err), true) - logger.Error("Protocol violation", zap.Error(err)) - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: false, - }) - return errors.New("Protocol violation") // we already logged it; don't double-log a long message - } - - return nil - }() - - if err != nil { - return err - } - - // Record that this request was handled without error - s.setLastInformantError(nil, false) - return nil -} - -// validateInformantDesc checks that the provided api.InformantDesc is valid and matches with an -// InformantServer's api.AgentDesc -func validateInformantDesc(server *api.AgentDesc, informant *api.InformantDesc) error { - // To quote the docs for api.InformantDesc.ProtoVersion: - // - // > If the VM informant does not use a protocol version within [the agent's] bounds, then it - // > MUST respond with an error status code. - // - // So if we're asked to validate the response, mismatch *should* have already been handled. - goodProtoVersion := server.MinProtoVersion <= informant.ProtoVersion && - informant.ProtoVersion <= server.MaxProtoVersion - - if !goodProtoVersion { - return fmt.Errorf( - "Unexpected protocol version: should be between %d and %d, but got %d", - server.MinProtoVersion, server.MaxProtoVersion, informant.ProtoVersion, - ) - } - - // To quote the docs for api.InformantMetricsMethod: - // - // > At least one method *must* be provided in an InformantDesc, and more than one method gives - // > the autoscaler-agent freedom to choose. - // - // We just need to check that there aren't none. - hasMetricsMethod := informant.MetricsMethod.Prometheus != nil - if !hasMetricsMethod { - return errors.New("No known metrics method given") - } - - return nil -} - -// unregisterFromInformant is an internal-ish function that sends an /unregister request to the VM -// informant -// -// Because sending an /unregister request is generally out of courtesy on exit, this method is more -// permissive about server state, and is typically called with a different Context from what would -// normally be expected. -// -// This method is only expected to be called by s.exit; calling this method before s.exitStatus has -// been set will likely cause the server to restart. -// -// This method MUST NOT be called while holding s.requestLock OR s.runner.lock. -func (s *InformantServer) unregisterFromInformant(ctx context.Context, logger *zap.Logger) error { - // note: Because this method is typically called during shutdown, we don't set - // s.runner.lastInformantError or call s.exit, even though other request helpers do. - - logger = logger.With(zap.Object("server", s.desc)) - - s.requestLock.Lock() - defer s.requestLock.Unlock() - - logger.Info("Sending unregister request to informant") - - // Make the request: - timeout := time.Second * time.Duration(s.runner.global.config.Informant.RegisterTimeoutSeconds) - resp, _, err := doInformantRequest[api.AgentDesc, api.UnregisterAgent]( - ctx, logger, s, timeout, http.MethodDelete, "/unregister", &s.desc, - ) - if err != nil { - return err // the errors returned by doInformantRequest are descriptive enough. - } - - logger.Info("Unregister request successful", zap.Any("response", *resp)) - return nil -} - -// doInformantRequest makes a single HTTP request to the VM informant, doing only the validation -// required to JSON decode the response -// -// The returned int gives the status code of the response. It is possible for a response with status -// 200 to still yield an error - either because of a later IO failure or bad JSON. -// -// If an error occurs before we get a response, the status code will be 0. -// -// This method MUST be called while holding s.requestLock. If not, the program will silently violate -// the protocol guarantees. -func doInformantRequest[Q any, R any]( - ctx context.Context, - logger *zap.Logger, - s *InformantServer, - timeout time.Duration, - method string, - path string, - reqData *Q, -) (_ *R, statusCode int, _ error) { - result := "" - defer func() { - s.runner.global.metrics.informantRequestsOutbound.WithLabelValues(path, result).Inc() - }() - - reqBody, err := json.Marshal(reqData) - if err != nil { - return nil, statusCode, fmt.Errorf("Error encoding request JSON: %w", err) - } - - reqCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - url := s.informantURL(path) - request, err := http.NewRequestWithContext(reqCtx, method, url, bytes.NewReader(reqBody)) - if err != nil { - return nil, statusCode, fmt.Errorf("Error building request to %q: %w", url, err) - } - request.Header.Set("content-type", "application/json") - - logger.Info("Sending informant request", zap.String("url", url), zap.Any("request", reqData)) - - response, err := http.DefaultClient.Do(request) - if err != nil { - result = fmt.Sprintf("[error doing request: %s]", util.RootError(err)) - return nil, statusCode, fmt.Errorf("Error doing request: %w", err) - } - defer response.Body.Close() - - statusCode = response.StatusCode - result = strconv.Itoa(statusCode) - - respBody, err := io.ReadAll(response.Body) - if err != nil { - return nil, statusCode, fmt.Errorf("Error reading body for response: %w", err) - } - - if statusCode != 200 { - return nil, statusCode, fmt.Errorf( - "Received response status %d body %q", statusCode, string(respBody), - ) - } - - var respData R - if err := json.Unmarshal(respBody, &respData); err != nil { - return nil, statusCode, fmt.Errorf("Bad JSON response: %w", err) - } - - logger.Info("Got informant response", zap.String("url", url), zap.Any("response", respData)) - - return &respData, statusCode, nil -} - -// fetchAndIncrementSequenceNumber increments the sequence number and returns it -// -// This method MUST be called while holding s.runner.lock. -func (s *InformantServer) incrementSequenceNumber() uint64 { - s.seqNum += 1 - return s.seqNum -} - -// informantURL creates a string representing the URL for a request to the VM informant, given the -// path to use -func (s *InformantServer) informantURL(path string) string { - if !strings.HasPrefix(path, "/") { - panic(errors.New("informant URL path must start with '/'")) - } - - ip := s.runner.podIP - port := s.runner.global.config.Informant.ServerPort - return fmt.Sprintf("http://%s:%d/%s", ip, port, path[1:]) -} - -// handleID handles a request on the server's /id endpoint. This method should not be called outside -// of that context. -// -// Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleID(ctx context.Context, _ *zap.Logger, body *struct{}) (_ *api.AgentIdentificationMessage, code int, _ error) { - defer func() { - s.runner.global.metrics.informantRequestsInbound.WithLabelValues("/id", strconv.Itoa(code)).Inc() - }() - - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - s.receivedIDCheck = true - - if s.exitStatus != nil { - return nil, 404, errors.New("Server has already exited") - } - - // Update our record of the last successful time we heard from the informant, if the server is - // currently enabled. This allows us to detect cases where the informant is not currently - // communicating back to the agent - OR when the informant never /resume'd the agent. - if s.mode == InformantServerRunning { - s.runner.status.update(s.runner.global, func(s podStatus) podStatus { - now := time.Now() - s.lastSuccessfulInformantComm = &now - return s - }) - } - - return &api.AgentIdentificationMessage{ - Data: api.AgentIdentification{AgentID: s.desc.AgentID}, - SequenceNumber: s.incrementSequenceNumber(), - }, 200, nil -} - -// handleResume handles a request on the server's /resume endpoint. This method should not be called -// outside of that context. -// -// Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleResume( - ctx context.Context, logger *zap.Logger, body *api.ResumeAgent, -) (_ *api.AgentIdentificationMessage, code int, _ error) { - defer func() { - s.runner.global.metrics.informantRequestsInbound.WithLabelValues("/resume", strconv.Itoa(code)).Inc() - }() - - if body.ExpectedID != s.desc.AgentID { - logger.Warn("Request AgentID not found, server has a different one") - return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID) - } - - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - if s.exitStatus != nil { - return nil, 404, errors.New("Server has already exited") - } - - // FIXME: Our handling of the protocol here is racy (because we might receive a /resume request - // before we've processed the response from our /register request). However, that's *probably* - // actually an issue with the protocol itself, rather than our handling. - - switch s.mode { - case InformantServerSuspended: - s.mode = InformantServerRunning - logger.Info( - "Informant server mode updated", - zap.String("action", "resume"), - zap.String("oldMode", string(InformantServerSuspended)), - zap.String("newMode", string(InformantServerRunning)), - ) - case InformantServerRunning: - internalErr := errors.New("Got /resume request for server, but it is already running") - logger.Warn("Protocol violation", zap.Error(internalErr)) - - // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ - Err: internalErr, - RetryShouldFix: true, - }) - - return nil, 400, errors.New("Cannot resume agent that is already running") - case InformantServerUnconfirmed: - internalErr := errors.New("Got /resume request for server, but it is unconfirmed") - logger.Warn("Protocol violation", zap.Error(internalErr)) - - // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ - Err: internalErr, - RetryShouldFix: true, - }) - - return nil, 400, errors.New("Cannot resume agent that is not yet registered") - default: - panic(fmt.Errorf("Unexpected InformantServerMode %q", s.mode)) - } - - return &api.AgentIdentificationMessage{ - Data: api.AgentIdentification{AgentID: s.desc.AgentID}, - SequenceNumber: s.incrementSequenceNumber(), - }, 200, nil -} - -// handleSuspend handles a request on the server's /suspend endpoint. This method should not be -// called outside of that context. -// -// Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleSuspend( - ctx context.Context, logger *zap.Logger, body *api.SuspendAgent, -) (_ *api.AgentIdentificationMessage, code int, _ error) { - defer func() { - s.runner.global.metrics.informantRequestsInbound.WithLabelValues("/suspend", strconv.Itoa(code)).Inc() - }() - - if body.ExpectedID != s.desc.AgentID { - logger.Warn("Request AgentID not found, server has a different one") - return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID) - } - - s.runner.lock.Lock() - locked := true - defer func() { - if locked { - s.runner.lock.Unlock() - } - }() - - if s.exitStatus != nil { - return nil, 404, errors.New("Server has already exited") - } - - switch s.mode { - case InformantServerRunning: - s.mode = InformantServerSuspended - logger.Info( - "Informant server mode updated", - zap.String("action", "suspend"), - zap.String("oldMode", string(InformantServerRunning)), - zap.String("newMode", string(InformantServerSuspended)), - ) - case InformantServerSuspended: - internalErr := errors.New("Got /suspend request for server, but it is already suspended") - logger.Warn("Protocol violation", zap.Error(internalErr)) - - // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ - Err: internalErr, - RetryShouldFix: true, - }) - - return nil, 400, errors.New("Cannot suspend agent that is already suspended") - case InformantServerUnconfirmed: - internalErr := errors.New("Got /suspend request for server, but it is unconfirmed") - logger.Warn("Protocol violation", zap.Error(internalErr)) - - // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ - Err: internalErr, - RetryShouldFix: true, - }) - - return nil, 400, errors.New("Cannot suspend agent that is not yet registered") - } - - locked = false - s.runner.lock.Unlock() - - // Acquire s.runner.requestLock so that when we return, we can guarantee that any future - // requests to NeonVM or the scheduler will first observe that the informant is suspended and - // exit early, before actually making the request. - if err := s.runner.requestLock.TryLock(ctx); err != nil { - err = fmt.Errorf("Context expired while trying to acquire requestLock: %w", err) - logger.Error("Failed to synchronize on requestLock", zap.Error(err)) - return nil, 500, err - } - s.runner.requestLock.Unlock() // don't actually hold the lock, we're just using it as a barrier. - - return &api.AgentIdentificationMessage{ - Data: api.AgentIdentification{AgentID: s.desc.AgentID}, - SequenceNumber: s.incrementSequenceNumber(), - }, 200, nil -} - -// handleTryUpscale handles a request on the server's /try-upscale endpoint. This method should not -// be called outside of that context. -// -// Returns: response body (if successful), status code, error (if unsuccessful) -func (s *InformantServer) handleTryUpscale( - ctx context.Context, - logger *zap.Logger, - body *api.MoreResourcesRequest, -) (_ *api.AgentIdentificationMessage, code int, _ error) { - defer func() { - s.runner.global.metrics.informantRequestsInbound.WithLabelValues("/upscale", strconv.Itoa(code)).Inc() - }() - - if body.ExpectedID != s.desc.AgentID { - logger.Warn("Request AgentID not found, server has a different one") - return nil, 404, fmt.Errorf("AgentID %q not found", body.ExpectedID) - } - - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - if s.exitStatus != nil { - return nil, 404, errors.New("Server has already exited") - } - - switch s.mode { - case InformantServerRunning: - if !s.protoVersion.HasTryUpscale() { - err := fmt.Errorf("/try-upscale not supported for protocol version %v", *s.protoVersion) - return nil, 400, err - } - - if body.MoreResources.Cpu || body.MoreResources.Memory { - s.upscaleRequested.Send() - } else { - logger.Warn("Received try-upscale request that has no resources selected") - } - - logger.Info( - "Updating requested upscale", - zap.Any("oldRequested", s.runner.requestedUpscale), - zap.Any("newRequested", body.MoreResources), - ) - s.runner.requestedUpscale = body.MoreResources - - return &api.AgentIdentificationMessage{ - Data: api.AgentIdentification{AgentID: s.desc.AgentID}, - SequenceNumber: s.incrementSequenceNumber(), - }, 200, nil - case InformantServerSuspended: - internalErr := errors.New("Got /try-upscale request for server, but server is suspended") - logger.Warn("Protocol violation", zap.Error(internalErr)) - - // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ - Err: internalErr, - RetryShouldFix: true, - }) - - return nil, 400, errors.New("Cannot process upscale while suspended") - case InformantServerUnconfirmed: - internalErr := errors.New("Got /try-upscale request for server, but server is suspended") - logger.Warn("Protocol violation", zap.Error(internalErr)) - - // To be nice, we'll restart the server. We don't want to make a temporary error permanent. - s.exit(InformantServerExitStatus{ - Err: internalErr, - RetryShouldFix: true, - }) - - return nil, 400, errors.New("Cannot process upscale while unconfirmed") - default: - panic(fmt.Errorf("unexpected server mode: %q", s.mode)) - } -} - -// HealthCheck makes a request to the informant's /health-check endpoint, using the server's ID. -// -// This method MUST be called while holding i.server.requestLock AND NOT i.server.runner.lock. -func (s *InformantServer) HealthCheck(ctx context.Context, logger *zap.Logger) (*api.InformantHealthCheckResp, error) { - err := func() error { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - return s.valid() - }() - // NB: we want to continue to perform health checks even if the informant server is not properly - // available for *normal* use. - // - // We only need to check for InformantServerSuspendedError because - // InformantServerUnconfirmedError will be handled by the retryRegister loop in - // serveInformantLoop. - if err != nil && !errors.Is(err, InformantServerSuspendedError) { - return nil, err - } - - logger = logger.With(zap.Object("server", s.desc)) - - timeout := time.Second * time.Duration(s.runner.global.config.Informant.RequestTimeoutSeconds) - id := api.AgentIdentification{AgentID: s.desc.AgentID} - - logger.Info("Sending health-check", zap.Any("id", id)) - resp, statusCode, err := doInformantRequest[api.AgentIdentification, api.InformantHealthCheckResp]( - ctx, logger, s, timeout, http.MethodPut, "/health-check", &id, - ) - if err != nil { - func() { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - s.setLastInformantError(fmt.Errorf("Health-check request failed: %w", err), true) - - if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: statusCode == 404, - }) - } - }() - return nil, err - } - - logger.Info("Received OK health-check result") - return resp, nil -} - -// Downscale makes a request to the informant's /downscale endpoint with the api.Resources -// -// This method MUST be called while holding s.requestLock AND NOT s.runner.lock -func (s *InformantServer) Downscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error) { - err := func() error { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - return s.valid() - }() - if err != nil { - return nil, err - } - - logger = logger.With(zap.Object("server", s.desc)) - - logger.Info("Sending downscale", zap.Object("target", to)) - - timeout := time.Second * time.Duration(s.runner.global.config.Informant.DownscaleTimeoutSeconds) - id := api.AgentIdentification{AgentID: s.desc.AgentID} - rawResources := to.ConvertToRaw(s.runner.vm.Mem.SlotSize) - - var statusCode int - var resp *api.DownscaleResult - if s.protoVersion.SignsResourceUpdates() { - signedRawResources := api.ResourceMessage{RawResources: rawResources, Id: id} - reqData := api.AgentResourceMessage{Data: signedRawResources, SequenceNumber: s.incrementSequenceNumber()} - resp, statusCode, err = doInformantRequest[api.AgentResourceMessage, api.DownscaleResult]( - ctx, logger, s, timeout, http.MethodPut, "/downscale", &reqData, - ) - } else { - resp, statusCode, err = doInformantRequest[api.RawResources, api.DownscaleResult]( - ctx, logger, s, timeout, http.MethodPut, "/downscale", &rawResources, - ) - } - if err != nil { - func() { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true) - - if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: statusCode == 404, - }) - } - }() - return nil, err - } - - logger.Info("Received downscale result") // already logged by doInformantRequest - return resp, nil -} - -// This method MUST be called while holding s.requestLock AND NOT s.runner.lock -func (s *InformantServer) Upscale(ctx context.Context, logger *zap.Logger, to api.Resources) error { - err := func() error { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - return s.valid() - }() - if err != nil { - return err - } - - logger = logger.With(zap.Object("server", s.desc)) - - logger.Info("Sending upscale", zap.Object("target", to)) - - timeout := time.Second * time.Duration(s.runner.global.config.Informant.DownscaleTimeoutSeconds) - id := api.AgentIdentification{AgentID: s.desc.AgentID} - rawResources := to.ConvertToRaw(s.runner.vm.Mem.SlotSize) - - var statusCode int - if s.protoVersion.SignsResourceUpdates() { - signedRawResources := api.ResourceMessage{RawResources: rawResources, Id: id} - reqData := api.AgentResourceMessage{Data: signedRawResources, SequenceNumber: s.incrementSequenceNumber()} - _, statusCode, err = doInformantRequest[api.AgentResourceMessage, struct{}]( - ctx, logger, s, timeout, http.MethodPut, "/upscale", &reqData, - ) - } else { - _, statusCode, err = doInformantRequest[api.RawResources, struct{}]( - ctx, logger, s, timeout, http.MethodPut, "/upscale", &rawResources, - ) - } - if err != nil { - func() { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - s.setLastInformantError(fmt.Errorf("Downscale request failed: %w", err), true) - - if 400 <= statusCode && statusCode <= 599 { - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: statusCode == 404, - }) - } - }() - return err - } - - logger.Info("Received successful upscale result") - return nil -} - -// MonitorHealthCheck is the equivalent of (*InformantServer).HealthCheck for -// when we're connected to a monitor. -// -// # This method MUST be called while holding s.requestLock AND NOT s.runner.lock -// -// *Note*: Locking requestLock is not technically necessary, but it allows for better -// serialization. For example, if this function exits, we know that no other -// dispatcher calls will be made. -func (s *InformantServer) MonitorHealthCheck(ctx context.Context, logger *zap.Logger) error { - timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds) - - _, err := s.dispatcher.Call(ctx, timeout, "HealthCheck", api.HealthCheck{}) - - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - - if err != nil { - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: false, - }) - return err - } - - // Update our record of the last successful time we heard from the monitor. This allows us to - // detect cases where the communication has broken down. - s.runner.status.update(s.runner.global, func(s podStatus) podStatus { - now := time.Now() - s.lastSuccessfulInformantComm = &now - return s - }) - - return nil -} - -// MonitorUpscale is the equivalent of (*InformantServer).Upscale for -// when we're connected to a monitor. -// -// # This method MUST be called while holding s.requestLock AND NOT s.runner.lock -// -// *Note*: Locking requestLock is not technically necessary, but it allows for -// better serialization. For example, if this function exits, we know that no -// other dispatcher calls will be made. -func (s *InformantServer) MonitorUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) error { - rawResources := to.ConvertToRaw(s.runner.vm.Mem.SlotSize) - cpu := rawResources.Cpu.AsApproximateFloat64() - mem := uint64(rawResources.Memory.Value()) - - timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds) - - _, err := s.dispatcher.Call(ctx, timeout, "UpscaleNotification", api.UpscaleNotification{ - Granted: api.Allocation{Cpu: cpu, Mem: mem}, - }) - if err != nil { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: false, - }) - return err - } - return nil -} - -// MonitorDownscale is the equivalent of (*InformantServer).Downscale for -// when we're connected to a monitor. -// -// # This method MUST be called while holding s.requestLock AND NOT s.runner.lock -// -// *Note*: Locking requestLock is not technically necessary, but it allows for -// better serialization. For example, if this function exits, we know that no -// other dispatcher calls will be made. -func (s *InformantServer) MonitorDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error) { - rawResources := to.ConvertToRaw(s.runner.vm.Mem.SlotSize) - cpu := rawResources.Cpu.AsApproximateFloat64() - mem := uint64(rawResources.Memory.Value()) - - timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds) - - res, err := s.dispatcher.Call(ctx, timeout, "DownscaleRequest", api.DownscaleRequest{ - Target: api.Allocation{Cpu: cpu, Mem: mem}, - }) - if err != nil { - s.runner.lock.Lock() - defer s.runner.lock.Unlock() - s.exit(InformantServerExitStatus{ - Err: err, - RetryShouldFix: false, - }) - return nil, err - } - - return res.Result, nil -} diff --git a/pkg/agent/prommetrics.go b/pkg/agent/prommetrics.go index 4e491209a..cf9fdd09c 100644 --- a/pkg/agent/prommetrics.go +++ b/pkg/agent/prommetrics.go @@ -12,10 +12,10 @@ type PromMetrics struct { schedulerRequestedChange resourceChangePair schedulerApprovedChange resourceChangePair - informantRequestsOutbound *prometheus.CounterVec - informantRequestsInbound *prometheus.CounterVec - informantRequestedChange resourceChangePair - informantApprovedChange resourceChangePair + monitorRequestsOutbound *prometheus.CounterVec + monitorRequestsInbound *prometheus.CounterVec + monitorRequestedChange resourceChangePair + monitorApprovedChange resourceChangePair neonvmRequestsOutbound *prometheus.CounterVec neonvmRequestedChange resourceChangePair @@ -102,49 +102,49 @@ func makePrometheusParts(globalstate *agentState) (PromMetrics, *prometheus.Regi )), }, - // ---- INFORMANT ---- - informantRequestsOutbound: util.RegisterMetric(reg, prometheus.NewCounterVec( + // ---- MONITOR ---- + monitorRequestsOutbound: util.RegisterMetric(reg, prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "autoscaling_agent_informant_outbound_requests_total", - Help: "Number of attempted HTTP requests to vm-informants by autoscaler-agents", + Name: "autoscaling_agent_monitor_outbound_requests_total", + Help: "Number of attempted HTTP requests to vm-monitors by autoscaler-agents", }, []string{"endpoint", "code"}, )), - informantRequestsInbound: util.RegisterMetric(reg, prometheus.NewCounterVec( + monitorRequestsInbound: util.RegisterMetric(reg, prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "autoscaling_agent_informant_inbound_requests_total", - Help: "Number of HTTP requests from vm-informants received by autoscaler-agents", + Name: "autoscaling_agent_monitor_inbound_requests_total", + Help: "Number of HTTP requests from vm-monitors received by autoscaler-agents", }, []string{"endpoint", "code"}, )), - informantRequestedChange: resourceChangePair{ + monitorRequestedChange: resourceChangePair{ cpu: util.RegisterMetric(reg, prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "autoscaling_agent_informant_requested_cpu_change_total", - Help: "Total change in CPU requested from the informant(s)", + Name: "autoscaling_agent_monitor_requested_cpu_change_total", + Help: "Total change in CPU requested from the vm-monitor(s)", }, []string{directionLabel}, )), mem: util.RegisterMetric(reg, prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "autoscaling_agent_informant_requested_mem_change_total", - Help: "Total change in memory (in MiB) requested from the informant(s)", + Name: "autoscaling_agent_monitor_requested_mem_change_total", + Help: "Total change in memory (in MiB) requested from the vm-monitor(s)", }, []string{directionLabel}, )), }, - informantApprovedChange: resourceChangePair{ + monitorApprovedChange: resourceChangePair{ cpu: util.RegisterMetric(reg, prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "autoscaling_agent_informant_approved_cpu_change_total", - Help: "Total change in CPU approved by the informant(s)", + Name: "autoscaling_agent_monitor_approved_cpu_change_total", + Help: "Total change in CPU approved by the vm-monitor(s)", }, []string{directionLabel}, )), mem: util.RegisterMetric(reg, prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "autoscaling_agent_informant_approved_mem_change_total", - Help: "Total change in memory (in MiB) approved by the informant(s)", + Name: "autoscaling_agent_monitor_approved_mem_change_total", + Help: "Total change in memory (in MiB) approved by the vm-monitor(s)", }, []string{directionLabel}, )), @@ -219,9 +219,9 @@ func makePrometheusParts(globalstate *agentState) (PromMetrics, *prometheus.Regi // scheduler: metrics.schedulerRequestedChange, metrics.schedulerApprovedChange, - // informant: - metrics.informantRequestedChange, - metrics.informantApprovedChange, + // monitor: + metrics.monitorRequestedChange, + metrics.monitorApprovedChange, // neonvm: metrics.neonvmRequestedChange, } diff --git a/pkg/agent/runner.go b/pkg/agent/runner.go index 25493426a..8dda7e5a1 100644 --- a/pkg/agent/runner.go +++ b/pkg/agent/runner.go @@ -12,7 +12,7 @@ package agent // 1. It should be OK to panic, if an error is truly unrecoverable // 2. A single Runner's panic shouldn't bring down the entire autoscaler-agent¹ // 3. We want to expose a State() method to view (almost) all internal state -// 4. Some high-level actions (e.g., HTTP request to Informant; update VM to desired state) require +// 4. Some high-level actions (e.g., call to vm-monitor; update VM to desired state) require // that we have *at most* one such action running at a time. // // There are a number of possible solutions to this set of goals. All reasonable solutions require @@ -24,9 +24,7 @@ package agent // * "track scheduler" // * "get metrics" // * "handle VM resources" - using metrics, calculates target resources level and contacts -// scheduler, informant, and NeonVM -- the "scaling" part of "autoscaling". -// * "informant server loop" - keeps Runner.informant and Runner.server up-to-date. -// * ... and a few more. +// scheduler, vm-monitor, and NeonVM -- the "scaling" part of "autoscaling". // * Each thread makes *synchronous* HTTP requests while holding the necessary lock to prevent any other // thread from making HTTP requests to the same entity. For example: // * All requests to NeonVM and the scheduler plugin are guarded by Runner.requestLock, which @@ -129,24 +127,17 @@ type Runner struct { // from non-nil to nil. The data behind each pointer is immutable, but the value of the pointer // itself is not. lastMetrics *api.Metrics - // requestedUpscale provides information about any requested upscaling by a VM informant - // - // This value is reset whenever we start a new informant server + + // requestedUpscale provides information about any requested upscaling by the vm-monitor requestedUpscale api.MoreResources // scheduler is the current scheduler that we're communicating with, or nil if there isn't one. // Each scheduler's info field is immutable. When a scheduler is replaced, only the pointer // value here is updated; the original Scheduler remains unchanged. scheduler *Scheduler - server *InformantServer - // informant holds the most recent InformantDesc that an InformantServer has received in its - // normal operation. If there has been at least one InformantDesc received, this field will not - // be nil. - // - // This field really should not be used except for providing RunnerState. The correct interface - // is through server.Informant(), which does all the appropriate error handling if the - // connection to the informant is not in a suitable state. - informant *api.InformantDesc + // monitor, if non nil, stores the current Dispatcher in use for communicating with the + // vm-monitor + monitor *Dispatcher // computeUnit is the latest Compute Unit reported by a scheduler. It may be nil, if we haven't // been able to contact one yet. // @@ -161,12 +152,6 @@ type Runner struct { // to the current scheduler. This field is not nil only when scheduler is not nil. lastSchedulerError error - // lastInformantError provides the error that occurred - if any - during the most recent request - // to the VM informant. - // - // This field MUST NOT be updated without holding BOTH lock AND server.requestLock. - lastInformantError error - // backgroundWorkerCount tracks the current number of background workers. It is exclusively // updated by r.spawnBackgroundWorker backgroundWorkerCount atomic.Int64 @@ -214,18 +199,15 @@ type Scheduler struct { // RunnerState is the serializable state of the Runner, extracted by its State method type RunnerState struct { - PodIP string `json:"podIP"` - VM api.VmInfo `json:"vm"` - LastMetrics *api.Metrics `json:"lastMetrics"` - RequestedUpscale api.MoreResources `json:"requestedUpscale"` - Scheduler *SchedulerState `json:"scheduler"` - Server *InformantServerState `json:"server"` - Informant *api.InformantDesc `json:"informant"` - ComputeUnit *api.Resources `json:"computeUnit"` - LastApproved *api.Resources `json:"lastApproved"` - LastSchedulerError error `json:"lastSchedulerError"` - LastInformantError error `json:"lastInformantError"` - BackgroundWorkerCount int64 `json:"backgroundWorkerCount"` + PodIP string `json:"podIP"` + VM api.VmInfo `json:"vm"` + LastMetrics *api.Metrics `json:"lastMetrics"` + RequestedUpscale api.MoreResources `json:"requestedUpscale"` + Scheduler *SchedulerState `json:"scheduler"` + ComputeUnit *api.Resources `json:"computeUnit"` + LastApproved *api.Resources `json:"lastApproved"` + LastSchedulerError error `json:"lastSchedulerError"` + BackgroundWorkerCount int64 `json:"backgroundWorkerCount"` SchedulerRespondedWithMigration bool `json:"migrationStarted"` } @@ -252,29 +234,13 @@ func (r *Runner) State(ctx context.Context) (*RunnerState, error) { } } - var serverState *InformantServerState - if r.server != nil { - serverState = &InformantServerState{ - Desc: r.server.desc, - SeqNum: r.server.seqNum, - ReceivedIDCheck: r.server.receivedIDCheck, - MadeContact: r.server.madeContact, - ProtoVersion: r.server.protoVersion, - Mode: r.server.mode, - ExitStatus: r.server.exitStatus, - } - } - return &RunnerState{ LastMetrics: r.lastMetrics, RequestedUpscale: r.requestedUpscale, Scheduler: scheduler, - Server: serverState, - Informant: r.informant, ComputeUnit: r.computeUnit, LastApproved: r.lastApproved, LastSchedulerError: r.lastSchedulerError, - LastInformantError: r.lastInformantError, VM: r.vm, PodIP: r.podIP, BackgroundWorkerCount: r.backgroundWorkerCount.Load(), @@ -351,9 +317,7 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util sendMetricsSignal, recvMetricsSignal := util.NewCondChannelPair() // signal when new schedulers are *registered* sendSchedSignal, recvSchedSignal := util.NewCondChannelPair() - // signal when r.informant is updated - sendInformantUpd, recvInformantUpd := util.NewCondChannelPair() - // signal when the informant requests upscaling + // signal when the vm-monitor requests upscaling sendUpscaleRequested, recvUpscaleRequested := util.NewCondChannelPair() logger.Info("Starting background workers") @@ -371,13 +335,13 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util r.trackSchedulerLoop(c, l, scheduler, schedulerWatch, sendSchedSignal) }) r.spawnBackgroundWorker(ctx, logger, "get metrics", func(c context.Context, l *zap.Logger) { - r.getMetricsLoop(c, l, sendMetricsSignal, recvInformantUpd) + r.getMetricsLoop(c, l, sendMetricsSignal) }) r.spawnBackgroundWorker(ctx, logger, "handle VM resources", func(c context.Context, l *zap.Logger) { r.handleVMResources(c, l, recvMetricsSignal, recvUpscaleRequested, recvSchedSignal, vmInfoUpdated) }) - r.spawnBackgroundWorker(ctx, logger, "informant server loop", func(c context.Context, l *zap.Logger) { - r.serveInformantLoop(c, l, sendInformantUpd, sendUpscaleRequested) + r.spawnBackgroundWorker(ctx, logger.Named("vm-monitor"), "vm-monitor reconnection loop", func(c context.Context, l *zap.Logger) { + r.connectToMonitorLoop(c, l, sendUpscaleRequested) }) // Note: Run doesn't terminate unless the parent context is cancelled - either because the VM @@ -458,7 +422,6 @@ func (r *Runner) getMetricsLoop( ctx context.Context, logger *zap.Logger, newMetrics util.CondChannelSender, - updatedInformant util.CondChannelReceiver, ) { timeout := time.Second * time.Duration(r.global.config.Metrics.RequestTimeoutSeconds) waitBetweenDuration := time.Second * time.Duration(r.global.config.Metrics.SecondsBetweenRequests) @@ -467,7 +430,7 @@ func (r *Runner) getMetricsLoop( minWaitDuration := time.Second for { - metrics, err := r.doMetricsRequestIfEnabled(ctx, logger, timeout, updatedInformant.Consume) + metrics, err := r.doMetricsRequest(ctx, logger, timeout) if err != nil { logger.Error("Error making metrics request", zap.Error(err)) goto next @@ -494,16 +457,11 @@ func (r *Runner) getMetricsLoop( case <-minWait: } - // After waiting for the required minimum, allow shortcutting the normal wait if the - // informant was updated select { case <-ctx.Done(): return - case <-updatedInformant.Recv(): - logger.Info("Shortcutting normal metrics wait because informant was updated") case <-waitBetween: } - } } @@ -641,146 +599,99 @@ func (r *Runner) handleVMResources( } } -// serveInformantLoop repeatedly creates an InformantServer to handle communications with the VM -// informant -// -// This function directly sets the value of r.server and indirectly sets r.informant. -func (r *Runner) serveInformantLoop( +// connectToMonitorLoop does lifecycle management of the (re)connection to the vm-monitor +func (r *Runner) connectToMonitorLoop( ctx context.Context, logger *zap.Logger, - updatedInformant util.CondChannelSender, upscaleRequested util.CondChannelSender, ) { - // variables set & accessed across loop iterations - var ( - normalRetryWait <-chan time.Time - minRetryWait <-chan time.Time - lastStart time.Time - ) + addr := fmt.Sprintf("ws://%s:%d/monitor", r.podIP, r.global.config.Monitor.ServerPort) - // Loop-invariant duration constants - minWait := time.Second * time.Duration(r.global.config.Informant.RetryServerMinWaitSeconds) - normalWait := time.Second * time.Duration(r.global.config.Informant.RetryServerNormalWaitSeconds) - retryRegister := time.Second * time.Duration(r.global.config.Informant.RegisterRetrySeconds) + minWait := time.Second * time.Duration(r.global.config.Monitor.ConnectionRetryMinWaitSeconds) + var lastStart time.Time -retryServer: - for { - // On each (re)try, unset the informant's requested upscale. We need to do this *before* - // starting the server, because otherwise it's possible for a racy /try-upscale request to - // sneak in before we reset it, which would cause us to incorrectly ignore the request. - if upscaleRequested.Unsend() { - logger.Info("Cancelled existing 'upscale requested' signal due to informant server restart") - } + for i := 0; ; i += 1 { + // Remove any prior Dispatcher from the Runner + func() { + r.lock.Lock() + defer r.lock.Unlock() + r.monitor = nil + }() - if normalRetryWait != nil { - logger.Info("Retrying informant server after delay", zap.Duration("delay", normalWait)) - select { - case <-ctx.Done(): - return - case <-normalRetryWait: + // If the context was canceled, don't restart + if err := ctx.Err(); err != nil { + action := "attempt" + if i != 0 { + action = "retry " } + logger.Info( + fmt.Sprintf("Aborting vm-monitor connection %s because context is already canceled", action), + zap.Error(err), + ) + return } - if minRetryWait != nil { - select { - case <-minRetryWait: - logger.Info("Retrying informant server") - default: + // Delayed restart management, long because of friendly logging: + if i != 0 { + endTime := time.Now() + runtime := endTime.Sub(lastStart) + + if runtime > minWait { + logger.Info( + "Immediately retrying connection to vm-monitor", + zap.String("addr", addr), + zap.Duration("totalRuntime", runtime), + ) + } else { + delay := minWait - runtime logger.Info( - "Informant server ended quickly. Respecting minimum delay before restart", - zap.Duration("activeTime", time.Since(lastStart)), zap.Duration("delay", minWait), + "Connection to vm-monitor was not live for long, retrying after delay", + zap.Duration("delay", delay), + zap.Duration("totalRuntime", runtime), ) + select { + case <-time.After(delay): + logger.Info( + "Retrying connection to vm-monitor", + zap.Duration("delay", delay), + zap.Duration("waitTime", time.Since(endTime)), + zap.String("addr", addr), + ) case <-ctx.Done(): + logger.Info( + "Canceling retrying connection to vm-monitor", + zap.Duration("delay", delay), + zap.Duration("waitTime", time.Since(endTime)), + zap.Error(ctx.Err()), + ) return - case <-minRetryWait: } } + } else { + logger.Info("Connecting to vm-monitor", zap.String("addr", addr)) } - normalRetryWait = nil // only "long wait" if an error occurred - minRetryWait = time.After(minWait) - lastStart = time.Now() - - server, exited, err := NewInformantServer(ctx, logger, r, updatedInformant, upscaleRequested) - if ctx.Err() != nil { - if err != nil { - logger.Warn("Error starting informant server (but context canceled)", zap.Error(err)) - } - return - } else if err != nil { - normalRetryWait = time.After(normalWait) - logger.Error("Error starting informant server", zap.Error(err)) - continue retryServer + dispatcher, err := NewDispatcher(ctx, logger, addr, r, upscaleRequested) + if err != nil { + logger.Error("Failed to connect to vm-monitor", zap.String("addr", addr), zap.Error(err)) + continue } - // Update r.server: + // Update runner to the new dispatcher func() { r.lock.Lock() defer r.lock.Unlock() - - var kind string - if r.server == nil { - kind = "Setting" - } else { - kind = "Updating" - } - - logger.Info(fmt.Sprintf("%s initial informant server", kind), zap.Object("server", server.desc)) - r.server = server + r.monitor = dispatcher }() - logger.Info("Registering with informant") - - // Try to register with the informant: - retryRegister: - for { - err := server.RegisterWithInformant(ctx, logger) - if err == nil { - break // all good; wait for the server to finish. - } else if ctx.Err() != nil { - if err != nil { - logger.Warn("Error registering with informant (but context cancelled)", zap.Error(err)) - } - return - } - - logger.Warn("Error registering with informant", zap.Error(err)) + // Wait until the dispatcher is no longer running, either due to error or because the + // root-level Runner context was canceled. + <-dispatcher.ExitSignal() - // Server exited; can't just retry registering. - if server.ExitStatus() != nil { - normalRetryWait = time.After(normalWait) - continue retryServer - } - - // Wait before retrying registering - logger.Info("Retrying registering with informant after delay", zap.Duration("delay", retryRegister)) - select { - case <-time.After(retryRegister): - continue retryRegister - case <-ctx.Done(): - return - } - } - - // Wait for the server to finish - select { - case <-ctx.Done(): - return - case <-exited.Recv(): - } - - // Server finished - exitStatus := server.ExitStatus() - if exitStatus == nil { - panic(errors.New("Informant server signalled end but ExitStatus() == nil")) - } - - if !exitStatus.RetryShouldFix { - normalRetryWait = time.After(normalWait) + if err := dispatcher.ExitError(); err != nil { + logger.Error("Dispatcher for vm-monitor connection exited due to error", zap.Error(err)) } - - continue retryServer } } @@ -948,65 +859,13 @@ waitForNewScheduler: // Lower-level implementation functions // ////////////////////////////////////////// -// doMetricsRequestIfEnabled makes a single metrics request to the VM informant, returning it -// -// This method expects that the Runner is not locked. -func (r *Runner) doMetricsRequestIfEnabled( +// doMetricsRequest makes a single metrics request to the VM +func (r *Runner) doMetricsRequest( ctx context.Context, logger *zap.Logger, timeout time.Duration, - clearNewInformantSignal func(), ) (*api.Metrics, error) { - logger.Info("Attempting metrics request") - - // FIXME: the region where the lock is held should be extracted into a separate method, called - // something like buildMetricsRequest(). - - r.lock.Lock() - locked := true - defer func() { - if locked { - r.lock.Unlock() - } - }() - - // Only clear the signal once we've locked, so that we're not racing. - // - // We don't *need* to do this, but its only cost is a small amount of code complexity, and it's - // nice to have have the guarantees around not racing. - clearNewInformantSignal() - - if r.server == nil || r.server.mode != InformantServerRunning { - var state = "unset" - if r.server != nil { - state = string(r.server.mode) - } - - logger.Info(fmt.Sprintf("Cannot make metrics request because informant server is %s", state)) - return nil, nil - } - - if r.informant == nil { - panic(errors.New("r.informant == nil but r.server.mode == InformantServerRunning")) - } - - var url string - var handle func(body []byte) (*api.Metrics, error) - - switch { - case r.informant.MetricsMethod.Prometheus != nil: - url = fmt.Sprintf("http://%s:%d/metrics", r.podIP, r.informant.MetricsMethod.Prometheus.Port) - handle = func(body []byte) (*api.Metrics, error) { - m, err := api.ReadMetrics(body, r.global.config.Metrics.LoadMetricPrefix) - if err != nil { - err = fmt.Errorf("Error reading metrics from prometheus output: %w", err) - } - return &m, err - } - default: - // Ok to panic here because this should be handled by the informant server - panic(errors.New("server's InformantDesc has unknown metrics method")) - } + url := fmt.Sprintf("http://%s:%d/metrics", r.podIP, r.global.config.Metrics.Port) reqCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -1016,10 +875,6 @@ func (r *Runner) doMetricsRequestIfEnabled( panic(fmt.Errorf("Error constructing metrics request to %q: %w", url, err)) } - // Unlock while we perform the request: - locked = false - r.lock.Unlock() - logger.Info("Making metrics request to VM", zap.String("url", url)) resp, err := http.DefaultClient.Do(req) @@ -1039,7 +894,12 @@ func (r *Runner) doMetricsRequestIfEnabled( return nil, fmt.Errorf("Unsuccessful response status %d: %s", resp.StatusCode, string(body)) } - return handle(body) + m, err := api.ReadMetrics(body, r.global.config.Metrics.LoadMetricPrefix) + if err != nil { + return nil, fmt.Errorf("Error reading metrics from prometheus output: %w", err) + } + + return &m, nil } // VMUpdateReason provides context to (*Runner).updateVMResources about why an update to the VM's @@ -1091,16 +951,6 @@ func (r *Runner) updateVMResources( logger.Info("Updating VM resources", zap.String("reason", string(reason))) - // A /suspend request from a VM informant will wait until requestLock returns. So we're good to - // make whatever requests we need as long as the informant is here at the start. - // - // The reason we care about the informant server being "enabled" is that the VM informant uses - // it to ensure that there's at most one autoscaler-agent that's making requests on its behalf. - if err := r.validateInformant(); err != nil { - logger.Warn("Unable to update VM resources because informant server is disabled", zap.Error(err)) - return nil - } - // state variables var ( start api.Resources // r.vm.Using(), at the time of the start of this function - for metrics. @@ -1176,16 +1026,6 @@ func (r *Runner) updateVMResources( nowUsing, err := r.doVMUpdate(ctx, logger, state.VM.Using(), capped, rejectedDownscale) if err != nil { return fmt.Errorf("Error doing VM update 1: %w", err) - } else if nowUsing == nil { - // From the comment above doVMUpdate: - // - // > If the VM informant is required and unavailable (or becomes unavailable), this - // > method will: return nil, nil; log an appropriate warning; and reset the VM's - // > state to its current value. - // - // So we should just return nil. We can't update right now, and there isn't anything - // left to log. - return nil } state.VM.SetUsing(*nowUsing) @@ -1455,10 +1295,7 @@ func (s *AtomicUpdateState) requiredCUForRequestedUpscaling() uint32 { // SCHEDULER. It is the caller's responsibility to ensure that target is not greater than // r.lastApproved, and check with the scheduler if necessary. // -// If the VM informant is required and unavailable (or becomes unavailable), this method will: -// return nil, nil; log an appropriate warning; and reset the VM's state to its current value. -// -// If some resources in target are less than current, and the VM informant rejects the proposed +// If some resources in target are less than current, and the dispatcher rejects the proposed // downscaling, rejectedDownscale will be called. If it returns an error, that error will be // returned and the update will be aborted. Otherwise, the returned newTarget will be used. // @@ -1481,24 +1318,18 @@ func (r *Runner) doVMUpdate( r.vm.SetUsing(amount) } - if err := r.validateInformant(); err != nil { - logger.Warn("Aborting VM update because informant server is not valid", zap.Error(err)) - resetVMTo(current) - return nil, nil - } - - // If there's any fields that are being downscaled, request that from the VM informant. + // If there's any fields that are being downscaled, request that from the monitor. downscaled := current.Min(target) if downscaled != current { - r.recordResourceChange(current, downscaled, r.global.metrics.informantRequestedChange) + r.recordResourceChange(current, downscaled, r.global.metrics.monitorRequestedChange) - resp, err := r.doInformantDownscale(ctx, logger, downscaled) - if err != nil || resp == nil /* resp = nil && err = nil when the error has been handled */ { + resp, err := r.doDownscale(ctx, logger, downscaled) + if err != nil { return nil, err } if resp.Ok { - r.recordResourceChange(current, downscaled, r.global.metrics.informantApprovedChange) + r.recordResourceChange(current, downscaled, r.global.metrics.monitorApprovedChange) } else { newTarget, err := rejectedDownscale() if err != nil { @@ -1569,7 +1400,7 @@ func (r *Runner) doVMUpdate( r.global.metrics.neonvmRequestsOutbound.WithLabelValues("ok").Inc() - // We scaled. If we run into an issue around further communications with the informant, then + // We scaled. If we run into an issue around further communications with the monitor, then // it'll be left with an inconsistent state - there's not really anything we can do about that, // unfortunately. resetVMTo(target) @@ -1588,13 +1419,13 @@ func (r *Runner) doVMUpdate( r.requestedUpscale = r.requestedUpscale.And(upscaled.IncreaseFrom(current).Not()) }() - r.recordResourceChange(downscaled, upscaled, r.global.metrics.informantRequestedChange) + r.recordResourceChange(downscaled, upscaled, r.global.metrics.monitorRequestedChange) - if ok, err := r.doInformantUpscale(ctx, logger, upscaled); err != nil || !ok { + if ok, err := r.doUpscale(ctx, logger, upscaled); err != nil || !ok { return nil, err } - r.recordResourceChange(downscaled, upscaled, r.global.metrics.informantApprovedChange) + r.recordResourceChange(downscaled, upscaled, r.global.metrics.monitorApprovedChange) } logger.Info("Updated VM resources", zap.Object("current", current), zap.Object("target", target)) @@ -1634,105 +1465,60 @@ func (r *Runner) recordResourceChange(current, target api.Resources, metrics res } } -// validateInformant checks that the Runner's informant server is present AND active (i.e. not -// suspended). -// -// If either condition is false, this method returns error. This is typically used to check that the -// Runner is enabled before making a request to NeonVM or the scheduler, in which case holding -// r.requestLock is advised. -// -// This method MUST NOT be called while holding r.lock. -func (r *Runner) validateInformant() error { - r.lock.Lock() - defer r.lock.Unlock() - - // Automatically valid if we are talking to a monitor - if r.server.informantIsMonitor { - return nil - } - - if r.server == nil { - return errors.New("no informant server set") - } - return r.server.valid() -} - -// doInformantDownscale is a convenience wrapper around (*InformantServer).Downscale that locks r, -// checks if r.server is nil, and does the request. -// -// Some errors are logged by this method instead of being returned. If that happens, this method -// returns nil, nil. -// -// This method MUST NOT be called while holding r.lock. -func (r *Runner) doInformantDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error) { - msg := "Error requesting informant downscale" +// This function must NOT be called while holding r.lock. +func (r *Runner) doDownscale(ctx context.Context, logger *zap.Logger, to api.Resources) (*api.DownscaleResult, error) { + rawResources := to.ConvertToAllocation(r.vm.Mem.SlotSize) + cpu := rawResources.Cpu + mem := rawResources.Mem - server := func() *InformantServer { + dispatcher := func() *Dispatcher { r.lock.Lock() defer r.lock.Unlock() - return r.server + return r.monitor }() - if server == nil { - return nil, fmt.Errorf("%s: InformantServer is not set (this should not occur after startup)", msg) - } - - server.requestLock.Lock() - defer server.requestLock.Unlock() - downscale := server.Downscale - if server.informantIsMonitor { - downscale = server.MonitorDownscale + if dispatcher == nil { + return nil, errors.New("No active connection to vm-monitor") } - resp, err := downscale(ctx, logger, to) + timeout := time.Second * time.Duration(r.global.config.Monitor.ResponseTimeoutSeconds) + + res, err := dispatcher.Call(ctx, logger, timeout, "DownscaleRequest", api.DownscaleRequest{ + Target: api.Allocation{Cpu: cpu, Mem: mem}, + }) if err != nil { - if IsNormalInformantError(err) { - logger.Warn(msg, zap.Object("server", server.desc), zap.Error(err)) - return nil, nil - } else { - return nil, fmt.Errorf("%s: %w", msg, err) - } + logger.Error("monitor failed to downscale", zap.Error(err)) + return nil, err } - return resp, nil + return res.Result, nil } -// doInformantUpscale is a convenience wrapper around (*InformantServer).Upscale that locks r, -// checks if r.server is nil, and does the request. -// -// Some errors are logged by this method instead of being returned. If that happens, this method -// returns false, nil. -// -// This method MUST NOT be called while holding r.lock. -func (r *Runner) doInformantUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) (ok bool, _ error) { - msg := "Error notifying informant of upscale" +// This function must NOT be called while holding r.lock. +func (r *Runner) doUpscale(ctx context.Context, logger *zap.Logger, to api.Resources) (ok bool, _ error) { + rawResources := to.ConvertToAllocation(r.vm.Mem.SlotSize) + cpu := rawResources.Cpu + mem := rawResources.Mem - server := func() *InformantServer { + dispatcher := func() *Dispatcher { r.lock.Lock() defer r.lock.Unlock() - return r.server + return r.monitor }() - if server == nil { - return false, fmt.Errorf("%s: InformantServer is not set (this should not occur after startup)", msg) - } - - server.requestLock.Lock() - defer server.requestLock.Unlock() - upscale := server.Upscale - if server.informantIsMonitor { - upscale = server.MonitorUpscale + if dispatcher == nil { + return false, errors.New("No active connection to vm-monitor") } - if err := upscale(ctx, logger, to); err != nil { - if IsNormalInformantError(err) { - logger.Warn(msg, zap.Error(err)) - return false, nil - } else { - return false, fmt.Errorf("%s: %w", msg, err) - } - } + timeout := time.Second * time.Duration(r.global.config.Monitor.ResponseTimeoutSeconds) + _, err := dispatcher.Call(ctx, logger, timeout, "UpscaleNotification", api.UpscaleNotification{ + Granted: api.Allocation{Cpu: cpu, Mem: mem}, + }) + if err != nil { + logger.Error("monitor failed to upscale", zap.Error(err)) + return false, err + } return true, nil } diff --git a/pkg/api/types.go b/pkg/api/types.go index adc08f5b5..c629405bf 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -6,7 +6,6 @@ import ( "fmt" "reflect" - "github.com/google/uuid" "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/api/resource" @@ -234,11 +233,11 @@ func (r Resources) IncreaseFrom(old Resources) MoreResources { } } -// ConvertToRaw produces the RawResources equivalent to these Resources with the given slot size -func (r Resources) ConvertToRaw(memSlotSize *resource.Quantity) RawResources { - return RawResources{ - Cpu: r.VCPU.ToResourceQuantity(), - Memory: resource.NewQuantity(int64(r.Mem)*memSlotSize.Value(), resource.BinarySI), +// ConvertToRaw produces the Allocation equivalent to these Resources with the given slot size +func (r Resources) ConvertToAllocation(memSlotSize *resource.Quantity) Allocation { + return Allocation{ + Cpu: r.VCPU.ToResourceQuantity().AsApproximateFloat64(), + Mem: uint64(int64(r.Mem) * memSlotSize.Value()), } } @@ -275,251 +274,11 @@ type PluginResponse struct { // TODO: fill this with more information as required type MigrateResponse struct{} -/////////////////////////// -// VM Informant Messages // -/////////////////////////// - -// InformantProtoVersion represents a single version of the agent<->informant protocol -// -// Each version of the agent<->informant protocol is named independently from releases of the -// repository containing this code. Names follow semver, although this does not necessarily -// guarantee support - for example, the VM informant may only support versions above v1.1. -// -// Version compatibility is documented in the neighboring file VERSIONING.md. -type InformantProtoVersion uint32 - -const ( - // InformantProtoV1_0 represents v1.0 of the agent<->informant protocol - the initial version. - // - // Last used in release version 0.1.2. - InformantProtoV1_0 InformantProtoVersion = iota + 1 // +1 so we start from 1 - - // InformantProtoV1_1 represents v1.1 of the agent<->informant protocol. - // - // Changes from v1.0: - // - // * Adds /try-upscale endpoint to the autoscaler-agent. - // - // Last used in release version v0.6.0. - InformantProtoV1_1 - - // InformantProtoV1_2 represents v1.2 of the agent<->informant protocol. - // - // Changes from v1.1: - // - // * Adds /health-check endpoint to the vm-informant. - // - // Last used in release version v0.9.0 - InformantProtoV1_2 - - // InformantProtoV2_0 represents v2.0 of the agent<->informant protocol. - // - // Changes from v1.2: - // - // * Agents now return a AgentResourceMessage when notifying VM's of changes - // in resources on their /upscale and /downscale endpoints. Since - // RawResources (the response type in previous protocols) is not - // deserializable out of an AgentResourceMessage, this is a breaking - // change. - // - // Currently the latest version. - InformantProtoV2_0 - - // latestInformantProtoVersion represents the latest version of the agent<->informant protocol - // - // This value is kept private because it should not be used externally; any desired - // functionality that could be implemented with it should instead be a method on - // InformantProtoVersion. - latestInformantProtoVersion InformantProtoVersion = iota // excluding +1 makes it equal to previous -) - -func (v InformantProtoVersion) String() string { - var zero InformantProtoVersion - - switch v { - case zero: - return "" - case InformantProtoV1_0: - return "v1.0" - case InformantProtoV1_1: - return "v1.1" - case InformantProtoV1_2: - return "v1.2" - case InformantProtoV2_0: - return "v2.0" - default: - diff := v - latestInformantProtoVersion - return fmt.Sprintf("", latestInformantProtoVersion, diff) - } -} - -// IsValid returns whether the protocol version is valid. The zero value is not valid. -func (v InformantProtoVersion) IsValid() bool { - return uint(v) != 0 -} - -// HasTryUpscale returns whether this version of the protocol has the /try-upscale endpoint -// -// This is true for version v1.1 and greater. -func (v InformantProtoVersion) HasTryUpscale() bool { - return v >= InformantProtoV1_1 -} - -// AllowsHealthCheck returns whether this version of the protocol has the informant's /health-check -// endpoint -// -// This is true for version v1.2 and greater. -func (v InformantProtoVersion) AllowsHealthCheck() bool { - return v >= InformantProtoV1_2 -} - -// SignsResourceUpdates returns whether agents inform VMs of resource updates with an -// AgentResourceMessage in this version of the protocol -// -// This is true for version v2.0 and greater -func (v InformantProtoVersion) SignsResourceUpdates() bool { - return v >= InformantProtoV2_0 -} - -// AgentMessage is used for (almost) every message sent from the autoscaler-agent to the VM -// informant, and serves to wrap the type T with a SequenceNumber -// -// The SequenceNumber provides a total ordering of states, even if the ordering of HTTP requests and -// responses are out of order. Fundamentally this is required because we have bidirectional -// communication between the autoscaler-agent and VM informant — without it, we run the risk of racy -// behavior, which could *actually* result in data corruption. -type AgentMessage[T any] struct { - // Data is the content of the request or response - Data T `json:"data"` - - // SequenceNumber is a unique-per-instance monotonically increasing number passed in each - // non-initial message from the autoscaler-agent to the VM informant, both requests and - // responses. - SequenceNumber uint64 `json:"sequenceNumber"` -} - -// AgentDesc is the first message sent from an autoscaler-agent to a VM informant, describing some -// information about the autoscaler-agent -// -// Each time an autoscaler-agent (re)connects to a VM informant, it sends an AgentDesc to the -// "/register" endpoint. -// -// For more information on the agent<->informant protocol, refer to the top-level ARCHITECTURE.md -type AgentDesc struct { - // AgentID is a unique UUID for the current instance of the autoscaler-agent - // - // This is helpful so that we can distinguish between (incorrect) duplicate calls to /register - // and (correct) re-registering of an agent. - AgentID uuid.UUID `json:"agentID"` - - // ServeAddr gives the unique (per instance) - ServerAddr string `json:"agentServeAddr"` - - // MinProtoVersion is the minimum version of the agent<->informant protocol that the - // autoscaler-agent supports - // - // Protocol versions are always non-zero. - // - // AgentDesc must always have MinProtoVersion <= MaxProtoVersion. - MinProtoVersion InformantProtoVersion `json:"minProtoVersion"` - // MaxProtoVersion is the maximum version of the agent<->informant protocol that the - // autoscaler-agent supports, inclusive. - // - // Protocol versions are always non-zero. - // - // AgentDesc must always have MinProtoVersion <= MaxProtoVersion. - MaxProtoVersion InformantProtoVersion `json:"maxProtoVersion"` -} - -// MarshalLogObject implements zapcore.ObjectMarshaler, so that Resources can be used with zap.Object -func (d AgentDesc) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("agentID", d.AgentID.String()) - enc.AddString("agentServeAddr", string(d.ServerAddr)) - enc.AddString("minProtoVersion", d.MinProtoVersion.String()) - enc.AddString("maxProtoVersion", d.MaxProtoVersion.String()) - return nil -} - -// ProtocolRange returns a VersionRange from d.MinProtoVersion to d.MaxProtoVersion. -func (d AgentDesc) ProtocolRange() VersionRange[InformantProtoVersion] { - return VersionRange[InformantProtoVersion]{ - Min: d.MinProtoVersion, - Max: d.MaxProtoVersion, - } -} - -type AgentIdentificationMessage = AgentMessage[AgentIdentification] - -// AgentIdentification affirms the AgentID of the autoscaler-agent in its initial response to a VM -// informant, on the /id endpoint. This response is always wrapped in an AgentMessage. A type alias -// for this is provided as AgentIdentificationMessage, for convenience. -type AgentIdentification struct { - // AgentID is the same AgentID as given in the AgentDesc initially provided to the VM informant - AgentID uuid.UUID `json:"agentID"` -} - -// InformantDesc describes the capabilities of a VM informant, in response to an autoscaler-agent's -// request on the /register endpoint -// -// For more information on the agent<->informant protocol, refer to the top-level ARCHITECTURE.md -type InformantDesc struct { - // ProtoVersion is the version of the agent<->informant protocol that the VM informant has - // selected - // - // If an autoscaler-agent is successfully registered, a well-behaved VM informant MUST respond - // with a ProtoVersion within the bounds of the agent's declared minimum and maximum protocol - // versions. If the VM informant does not use a protocol version within those bounds, then it - // MUST respond with an error status code. - ProtoVersion InformantProtoVersion `json:"protoVersion"` - - // MetricsMethod tells the autoscaler-agent how to fetch metrics from the VM - MetricsMethod InformantMetricsMethod `json:"metricsMethod"` -} - -// InformantMetricsMethod collects the options for ways the VM informant can report metrics -// -// At least one method *must* be provided in an InformantDesc, and more than one method gives the -// autoscaler-agent freedom to choose. -// -// We use this type so it's easier to ensure backwards compatibility with previous versions of the -// VM informant — at least during the rollout of new autoscaler-agent or VM informant versions. -type InformantMetricsMethod struct { - // Prometheus describes prometheus-format metrics, typically not through the informant itself - Prometheus *MetricsMethodPrometheus `json:"prometheus,omitempty"` -} - -// MetricsMethodPrometheus describes VM informant's metrics in the prometheus format, made available -// on a particular port -type MetricsMethodPrometheus struct { - Port uint16 `json:"port"` -} - -// InformantHealthCheckResp is the result of a successful request to a VM informant's /health-check -// endpoint. -type InformantHealthCheckResp struct{} - -// UnregisterAgent is the result of a successful request to a VM informant's /unregister endpoint -type UnregisterAgent struct { - // WasActive indicates whether the unregistered autoscaler-agent was the one in-use by the VM - // informant - WasActive bool `json:"wasActive"` -} - -// MoreResourcesRequest is the request type wrapping MoreResources that's sent by the VM informant -// to the autoscaler-agent's /try-upscale endpoint when the VM is urgently in need of more -// resources. -type MoreResourcesRequest struct { - MoreResources - - // ExpectedID is the expected AgentID of the autoscaler-agent - ExpectedID uuid.UUID `json:"expectedID"` -} - // MoreResources holds the data associated with a MoreResourcesRequest type MoreResources struct { - // Cpu is true if the VM informant is requesting more CPU + // Cpu is true if the vm-monitor is requesting more CPU Cpu bool `json:"cpu"` - // Memory is true if the VM informant is requesting more memory + // Memory is true if the vm-monitor is requesting more memory Memory bool `json:"memory"` } @@ -539,51 +298,6 @@ func (m MoreResources) And(cmp MoreResources) MoreResources { } } -// RawResources signals raw resource amounts, and is primarily used in communications with the VM -// informant because it doesn't know about things like memory slots. -// -// This is used in protocol versions <2. In later versions, AgentResourceMessage is used. -type RawResources struct { - Cpu *resource.Quantity `json:"cpu"` - Memory *resource.Quantity `json:"memory"` -} - -type AgentResourceMessage = AgentMessage[ResourceMessage] - -// Similar to RawResources, stores raw resource amounts. However, it also stores the ID of the agent -// notifying the VM of a change in resources. In protocol versions 2 and on, agents notify VM's of -// changes to their available resources with an AgentResourceMessage. This allows VM informants to verify -// the authenticity of the agent responding. -type ResourceMessage struct { - RawResources - Id AgentIdentification `json:"id"` -} - -// DownscaleResult is used by the VM informant to return whether it downscaled successfully, and -// some indication of its status when doing so. -// -// DownscaleResult is also used in informant-monitor communications. The monitor can send a -// DownscaleResult, which is then propagated to the informant. -type DownscaleResult struct { - Ok bool - Status string -} - -// SuspendAgent is sent from the VM informant to the autoscaler-agent when it has been contacted by -// a new autoscaler-agent and wishes to switch to that instead -// -// Instead of just cutting off any connection(s) to the agent, the informant keeps it around in case -// the new one fails and it needs to fall back to the old one. -type SuspendAgent struct { - ExpectedID uuid.UUID `json:"expectedID"` -} - -// ResumeAgent is sent from the VM informant to the autoscaler-agent to resume contact when it was -// previously suspended. -type ResumeAgent struct { - ExpectedID uuid.UUID `json:"expectedID"` -} - //////////////////////////////////// // Controller <-> Runner Messages // //////////////////////////////////// @@ -613,7 +327,7 @@ func (v RunnerProtoVersion) SupportsCgroupFractionalCPU() bool { } //////////////////////////////////// -// Informant <-> Monitor Messages // +// Agent <-> Monitor Messages // //////////////////////////////////// // Represents the resources that a VM has been granted @@ -627,22 +341,25 @@ type Allocation struct { // ** Types sent by monitor ** -// This type is sent to the informant as a way to request immediate upscale. -// Since the informant cannot control if the agent will choose to upscale the VM, -// it does not return anything. However, it should internally request an upscale -// from the agent. If an upscale is granted, the monitor will be notified via an -// UpscaleNotification. +// This type is sent to the agent as a way to request immediate upscale. +// Since the agent cannot control if the agent will choose to upscale the VM, +// it does not return anything. If an upscale is granted, the agent will notify +// the monitor via an UpscaleConfirmation type UpscaleRequest struct{} -// This type is sent to the informant to confirm it successfully upscaled, meaning -// it increased its filecache and/or cgroup memory limits. The informant does not +// This type is sent to the agent to confirm it successfully upscaled, meaning +// it increased its filecache and/or cgroup memory limits. The agent does not // need to respond. type UpscaleConfirmation struct{} -// `api.DownscaleResult` is also sent to the informant after the monitor tries to -// downscale +// This type is sent to the agent to indicate if downscaling was successful. The +// agent does not need to respond. +type DownscaleResult struct { + Ok bool + Status string +} -// ** Types sent by informant ** +// ** Types sent by agent ** // This type is sent to the monitor to inform it that it has been granted a geater // allocation. Once the monitor is done applying this new allocation (i.e, increasing @@ -653,12 +370,12 @@ type UpscaleNotification struct { // This type is sent to the monitor as a request to downscale its resource usage. // Once the monitor has downscaled or failed to do so, it should respond with a -// api.DownscaleResult (listed in the informant<->agent protocol section). +// DownscaleResult. type DownscaleRequest struct { Target Allocation `json:"target"` } -// ** Types shared by informant and monitor ** +// ** Types shared by agent and monitor ** // This type can be sent by either party whenever they receive a message they // cannot deserialize properly. @@ -675,7 +392,7 @@ type InternalError struct { } // This type is sent as part of a bidirectional heartbeat between the monitor and -// informant. The check is initiated by the informant. +// agent. The check is initiated by the agent. type HealthCheck struct{} // This function is used to prepare a message for serialization. Any data passed @@ -686,7 +403,7 @@ type HealthCheck struct{} // - InvalidMessage // - InternalError // - HealthCheck -func SerializeInformantMessage(content any, id uint64) ([]byte, error) { +func SerializeMonitorMessage(content any, id uint64) ([]byte, error) { // The final type that gets sent over the wire type Bundle struct { Content any `json:"content"` @@ -717,7 +434,7 @@ func SerializeInformantMessage(content any, id uint64) ([]byte, error) { }) } -// MonitorProtoVersion represents a single version of the informant<->monitor protocol +// MonitorProtoVersion represents a single version of the agent<->monitor protocol // // Each version of the agent<->monitor protocol is named independently from releases of the // repository containing this code. Names follow semver, although this does not necessarily diff --git a/tests/e2e/autoscaling/00-create-vm.yaml b/tests/e2e/autoscaling/00-create-vm.yaml index b9152e95a..006406785 100644 --- a/tests/e2e/autoscaling/00-create-vm.yaml +++ b/tests/e2e/autoscaling/00-create-vm.yaml @@ -51,7 +51,7 @@ spec: port: 5432 - name: host-metrics port: 9100 - - name: informant + - name: monitor port: 10301 extraNetwork: enable: true diff --git a/vm-deploy.yaml b/vm-deploy.yaml index d7a69403c..93fa211cd 100644 --- a/vm-deploy.yaml +++ b/vm-deploy.yaml @@ -23,4 +23,4 @@ spec: - port: 22 # ssh - port: 5432 # postgres - port: 9100 # metrics - - port: 10301 # informant + - port: 10301 # monitor