Skip to content

Commit

Permalink
monitor/compute_ctl: remove references to the informant (#5115)
Browse files Browse the repository at this point in the history
Also added some docs to the monitor :)

Co-authored-by: Em Sharnoff <[email protected]>
  • Loading branch information
fprasx and sharnoff authored Aug 28, 2023
1 parent e40ee7c commit 85d6d9d
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 50 deletions.
7 changes: 4 additions & 3 deletions compute_tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ Also `compute_ctl` spawns two separate service threads:
- `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
last activity requests.

If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
downscaling and (eventually) will request immediate upscaling under resource pressure.
If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
`vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
`vm-monitor` communicates with the VM autoscaling system. It coordinates
downscaling and requests immediate upscaling under resource pressure.

Usage example:
```sh
Expand Down
7 changes: 4 additions & 3 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
//! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
//! last activity requests.
//!
//! If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
//! compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
//! downscaling and (eventually) will request immediate upscaling under resource pressure.
//! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
//! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
//! `vm-monitor` communicates with the VM autoscaling system. It coordinates
//! downscaling and requests immediate upscaling under resource pressure.
//!
//! Usage example:
//! ```sh
Expand Down
16 changes: 16 additions & 0 deletions libs/vm_monitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,19 @@ in the `neon-postgres` cgroup and set its `memory.{max,high}`.
* See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/),
where initial development of the monitor happened. The repository is no longer
maintained but the commit history may be useful for debugging.

## Structure

The `vm-monitor` is loosely comprised of a few systems. These are:
* the server: this is just a simple `axum` server that accepts requests and
upgrades them to websocket connections. The server only allows one connection at
a time. This means that upon receiving a new connection, the server will terminate
and old one if it exists.
* the filecache: a struct that allows communication with the Postgres file cache.
On startup, we connect to the filecache and hold on to the connection for the
entire monitor lifetime.
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by
listening for `memory.high` events and setting its `memory.{high,max}` values.
* the runner: the runner marries the filecache and cgroup watcher together,
communicating with the agent throught the `Dispatcher`, and then calling filecache
and cgroup watcher functions as needed to upscale and downscale
36 changes: 17 additions & 19 deletions libs/vm_monitor/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Managing the websocket connection and other signals in the monitor.
//!
//! Contains types that manage the interaction (not data interchange, see `protocol`)
//! between informant and monitor, allowing us to to process and send messages in a
//! between agent and monitor, allowing us to to process and send messages in a
//! straightforward way. The dispatcher also manages that signals that come from
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
Expand All @@ -24,16 +24,16 @@ use crate::protocol::{
/// The central handler for all communications in the monitor.
///
/// The dispatcher has two purposes:
/// 1. Manage the connection to the informant, sending and receiving messages.
/// 1. Manage the connection to the agent, sending and receiving messages.
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
/// and sending a message to the informant when the cgroup manager requests
/// and sending a message to the agent when the cgroup manager requests
/// upscale.
#[derive(Debug)]
pub struct Dispatcher {
/// We read informant messages of of `source`
/// We read agent messages of of `source`
pub(crate) source: SplitStream<WebSocket>,

/// We send messages to the informant through `sink`
/// We send messages to the agent through `sink`
sink: SplitSink<WebSocket, Message>,

/// Used to notify the cgroup when we are upscaled.
Expand All @@ -43,7 +43,7 @@ pub struct Dispatcher {
/// we send an `UpscaleRequst` to the agent.
pub(crate) request_upscale_events: mpsc::Receiver<()>,

/// The protocol version we have agreed to use with the informant. This is negotiated
/// The protocol version we have agreed to use with the agent. This is negotiated
/// during the creation of the dispatcher, and should be the highest shared protocol
/// version.
///
Expand All @@ -56,9 +56,9 @@ pub struct Dispatcher {
impl Dispatcher {
/// Creates a new dispatcher using the passed-in connection.
///
/// Performs a negotiation with the informant to determine the highest protocol
/// Performs a negotiation with the agent to determine the highest protocol
/// version that both support. This consists of two steps:
/// 1. Wait for the informant to sent the range of protocols it supports.
/// 1. Wait for the agent to sent the range of protocols it supports.
/// 2. Send a protocol version that works for us as well, or an error if there
/// is no compatible version.
pub async fn new(
Expand All @@ -69,7 +69,7 @@ impl Dispatcher {
let (mut sink, mut source) = stream.split();

// Figure out the highest protocol version we both support
info!("waiting for informant to send protocol version range");
info!("waiting for agent to send protocol version range");
let Some(message) = source.next().await else {
bail!("websocket connection closed while performing protocol handshake")
};
Expand All @@ -79,7 +79,7 @@ impl Dispatcher {
let Message::Text(message_text) = message else {
// All messages should be in text form, since we don't do any
// pinging/ponging. See nhooyr/websocket's implementation and the
// informant/agent for more info
// agent for more info
bail!("received non-text message during proocol handshake: {message:?}")
};

Expand All @@ -88,32 +88,30 @@ impl Dispatcher {
max: PROTOCOL_MAX_VERSION,
};

let informant_range: ProtocolRange = serde_json::from_str(&message_text)
let agent_range: ProtocolRange = serde_json::from_str(&message_text)
.context("failed to deserialize protocol version range")?;

info!(range = ?informant_range, "received protocol version range");
info!(range = ?agent_range, "received protocol version range");

let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) {
let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) {
Ok(version) => {
sink.send(Message::Text(
serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(),
))
.await
.context("failed to notify informant of negotiated protocol version")?;
.context("failed to notify agent of negotiated protocol version")?;
version
}
Err(e) => {
sink.send(Message::Text(
serde_json::to_string(&ProtocolResponse::Error(format!(
"Received protocol version range {} which does not overlap with {}",
informant_range, monitor_range
agent_range, monitor_range
)))
.unwrap(),
))
.await
.context(
"failed to notify informant of no overlap between protocol version ranges",
)?;
.context("failed to notify agent of no overlap between protocol version ranges")?;
Err(e).context("error determining suitable protocol version range")?
}
};
Expand All @@ -137,7 +135,7 @@ impl Dispatcher {
.context("failed to send resources and oneshot sender across channel")
}

/// Send a message to the informant.
/// Send a message to the agent.
///
/// Although this function is small, it has one major benefit: it is the only
/// way to send data accross the connection, and you can only pass in a proper
Expand Down
4 changes: 2 additions & 2 deletions libs/vm_monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res

/// Handles incoming websocket connections.
///
/// If we are already to connected to an informant, we kill that old connection
/// If we are already to connected to an agent, we kill that old connection
/// and accept the new one.
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
pub async fn ws_handler(
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn start_monitor(
return;
}
};
info!("connected to informant");
info!("connected to agent");

match monitor.run().await {
Ok(()) => info!("monitor was killed due to new connection"),
Expand Down
34 changes: 17 additions & 17 deletions libs/vm_monitor/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Types representing protocols and actual informant-monitor messages.
//! Types representing protocols and actual agent-monitor messages.
//!
//! The pervasive use of serde modifiers throughout this module is to ease
//! serialization on the go side. Because go does not have enums (which model
//! messages well), it is harder to model messages, and we accomodate that with
//! serde.
//!
//! *Note*: the informant sends and receives messages in different ways.
//! *Note*: the agent sends and receives messages in different ways.
//!
//! The informant serializes messages in the form and then sends them. The use
//! The agent serializes messages in the form and then sends them. The use
//! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
//! to determine how to deserialize `Content`.
//! ```ignore
Expand All @@ -25,9 +25,9 @@
//! Id uint64
//! }
//! ```
//! After reading the type field, the informant will decode the entire message
//! After reading the type field, the agent will decode the entire message
//! again, this time into the correct type using the embedded fields.
//! Because the informant cannot just extract the json contained in a certain field
//! Because the agent cannot just extract the json contained in a certain field
//! (it initially deserializes to `map[string]interface{}`), we keep the fields
//! at the top level, so the entire piece of json can be deserialized into a struct,
//! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
Expand All @@ -37,7 +37,7 @@ use std::cmp;

use serde::{de::Error, Deserialize, Serialize};

/// A Message we send to the informant.
/// A Message we send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OutboundMsg {
#[serde(flatten)]
Expand All @@ -51,31 +51,31 @@ impl OutboundMsg {
}
}

/// The different underlying message types we can send to the informant.
/// The different underlying message types we can send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
pub enum OutboundMsgKind {
/// Indicates that the informant sent an invalid message, i.e, we couldn't
/// Indicates that the agent sent an invalid message, i.e, we couldn't
/// properly deserialize it.
InvalidMessage { error: String },
/// Indicates that we experienced an internal error while processing a message.
/// For example, if a cgroup operation fails while trying to handle an upscale,
/// we return `InternalError`.
InternalError { error: String },
/// Returned to the informant once we have finished handling an upscale. If the
/// Returned to the agent once we have finished handling an upscale. If the
/// handling was unsuccessful, an `InternalError` will get returned instead.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleConfirmation {},
/// Indicates to the monitor that we are urgently requesting resources.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleRequest {},
/// Returned to the informant once we have finished attempting to downscale. If
/// Returned to the agent once we have finished attempting to downscale. If
/// an error occured trying to do so, an `InternalError` will get returned instead.
/// However, if we are simply unsuccessful (for example, do to needing the resources),
/// that gets included in the `DownscaleResult`.
DownscaleResult {
// FIXME for the future (once the informant is deprecated)
// As of the time of writing, the informant/agent version of this struct is
// As of the time of writing, the agent/informant version of this struct is
// called api.DownscaleResult. This struct has uppercase fields which are
// serialized as such. Thus, we serialize using uppercase names so we don't
// have to make a breaking change to the agent<->informant protocol. Once
Expand All @@ -88,20 +88,20 @@ pub enum OutboundMsgKind {
status: String,
},
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
/// informant.
/// agent.
/// *Note*: this is a struct variant because of the way go serializes struct{}
HealthCheck {},
}

/// A message received form the informant.
/// A message received form the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InboundMsg {
#[serde(flatten)]
pub(crate) inner: InboundMsgKind,
pub(crate) id: usize,
}

/// The different underlying message types we can receive from the informant.
/// The different underlying message types we can receive from the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", content = "content")]
pub enum InboundMsgKind {
Expand All @@ -120,14 +120,14 @@ pub enum InboundMsgKind {
/// when done.
DownscaleRequest { target: Resources },
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
/// informant.
/// agent.
/// *Note*: this is a struct variant because of the way go serializes struct{}
HealthCheck {},
}

/// Represents the resources granted to a VM.
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
// Renamed because the agent/informant has multiple resources types:
// Renamed because the agent has multiple resources types:
// `Resources` (milliCPU/memory slots)
// `Allocation` (vCPU/bytes) <- what we correspond to
#[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
Expand All @@ -151,7 +151,7 @@ pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
pub struct ProtocolVersion(u8);

impl ProtocolVersion {
/// Represents v1.0 of the informant<-> monitor protocol - the initial version
/// Represents v1.0 of the agent<-> monitor protocol - the initial version
///
/// Currently the latest version.
const V1_0: ProtocolVersion = ProtocolVersion(1);
Expand Down
12 changes: 6 additions & 6 deletions libs/vm_monitor/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Exposes the `Runner`, which handles messages received from informant and
//! Exposes the `Runner`, which handles messages received from agent and
//! sends upscale requests.
//!
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
Expand All @@ -21,8 +21,8 @@ use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};

/// Central struct that interacts with informant, dispatcher, and cgroup to handle
/// signals from the informant.
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
#[derive(Debug)]
pub struct Runner {
config: Config,
Expand Down Expand Up @@ -371,7 +371,7 @@ impl Runner {
Ok(None)
}
InboundMsgKind::InternalError { error } => {
warn!(error, id, "informant experienced an internal error");
warn!(error, id, "agent experienced an internal error");
Ok(None)
}
InboundMsgKind::HealthCheck {} => {
Expand Down Expand Up @@ -405,7 +405,7 @@ impl Runner {
.await
.context("failed to send message")?;
}
// there is a message from the informant
// there is a message from the agent
msg = self.dispatcher.source.next() => {
if let Some(msg) = msg {
// Don't use 'message' as a key as the string also uses
Expand All @@ -422,7 +422,7 @@ impl Runner {
// Don't use 'message' as a key as the
// string also uses that for its key
msg = ?other,
"informant should only send text messages but received different type"
"agent should only send text messages but received different type"
);
continue
},
Expand Down

1 comment on commit 85d6d9d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1692 tests run: 1609 passed, 0 failed, 83 skipped (full report)


The comment gets automatically updated with the latest test results
85d6d9d at 2023-08-29T00:42:48.189Z :recycle:

Please sign in to comment.