From 373eb7fc043e548986bf97576b252b0c586660ce Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Thu, 5 Oct 2023 22:20:01 -0700 Subject: [PATCH] vm-monitor: Refactor scaling logic into CgroupWatcher The general idea of this PR is to move the on-downscale and on-upscale cgroup handling logic into into the CgroupWatcher itself via message passing of commands, rather than directly acting on the cgroup from the thread handling the websocket message. This change is the large pre-requisite to a handful of smaller changes that should be much easier with this, all part of the Epic about fixing memory.high throttling (#5444): 1. Fix a potential race condition wherein the logic that increases memory.high in response to memory.high events could overwrite newer (more permissive) values set by actual upscaling - **Handled by this change already!** 2. Fix a bug where due to already increased memory.high to avoid throttling, upscaling actually decreases memory.high and leads to unrecoverable throttling. 3. If memory.high has been increased to avoid throttling but no upscaling has happened, periodically try to decrease back to the desired memory.high. For more general context, refer to #5444. --- Cargo.lock | 1 + libs/vm_monitor/Cargo.toml | 1 + libs/vm_monitor/src/cgroup.rs | 718 +++++++++++++++++++----------- libs/vm_monitor/src/dispatcher.rs | 18 +- libs/vm_monitor/src/filecache.rs | 57 ++- libs/vm_monitor/src/runner.rs | 176 +++----- 6 files changed, 561 insertions(+), 410 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be3f179d5fbd..d2b5b659d7b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5190,6 +5190,7 @@ dependencies = [ "clap", "futures", "inotify 0.10.2", + "pin-project-lite", "serde", "serde_json", "sysinfo", diff --git a/libs/vm_monitor/Cargo.toml b/libs/vm_monitor/Cargo.toml index 26b976830a7a..b03a6191adc5 100644 --- a/libs/vm_monitor/Cargo.toml +++ b/libs/vm_monitor/Cargo.toml @@ -29,3 +29,4 @@ workspace_hack = { version = "0.1", path = "../../workspace_hack" } [target.'cfg(target_os = "linux")'.dependencies] cgroups-rs = "0.3.3" +pin-project-lite.workspace = true diff --git a/libs/vm_monitor/src/cgroup.rs b/libs/vm_monitor/src/cgroup.rs index 15e972505e46..2996337b0d6f 100644 --- a/libs/vm_monitor/src/cgroup.rs +++ b/libs/vm_monitor/src/cgroup.rs @@ -5,7 +5,7 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, }; -use anyhow::{anyhow, bail, Context}; +use anyhow::{anyhow, Context}; use cgroups_rs::{ freezer::FreezerController, hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT}, @@ -14,13 +14,13 @@ use cgroups_rs::{ Subsystem::{Freezer, Mem}, }; use inotify::{EventStream, Inotify, WatchMask}; -use tokio::sync::mpsc::{self, error::TryRecvError}; +use pin_project_lite::pin_project; +use tokio::sync::{mpsc, oneshot}; use tokio::time::{Duration, Instant}; use tokio_stream::{Stream, StreamExt}; -use tracing::{info, warn}; +use tracing::{error, info, warn}; -use crate::protocol::Resources; -use crate::MiB; +use crate::{bytes_to_mebibytes, MiB}; /// Monotonically increasing counter of the number of memory.high events /// the cgroup has experienced. @@ -88,7 +88,7 @@ pub struct Config { // // TODO: there's some minor issues with this approach -- in particular, that we might have // memory in use by the kernel's page cache that we're actually ok with getting rid of. - pub(crate) memory_high_buffer_bytes: u64, + memory_high_buffer_bytes: u64, // The maximum duration, in milliseconds, that we're allowed to pause // the cgroup for while waiting for the autoscaler-agent to upscale us @@ -145,7 +145,7 @@ impl Default for Config { /// a unique sequence number. Sequence numbers are monotonically increasing, /// allowing us to answer questions like "did this upscale happen after this /// memory.high event?" by comparing the sequence numbers of the two events. -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct Sequenced { seqnum: u64, data: T, @@ -168,23 +168,12 @@ impl Sequenced { /// cgroup happy. #[derive(Debug)] pub struct CgroupWatcher { - pub config: Config, - - /// The sequence number of the last upscale. - /// - /// If we receive a memory.high event that has a _lower_ sequence number than - /// `last_upscale_seqnum`, then we know it occured before the upscale, and we - /// can safely ignore it. - /// - /// Note: Like the `events` field, this doesn't _need_ interior mutability but we - /// use it anyways so that methods take `&self`, not `&mut self`. - last_upscale_seqnum: AtomicU64, - - /// A channel on which we send messages to request upscale from the dispatcher. - upscale_requester: mpsc::Sender<()>, + config: Config, /// The actual cgroup we are watching and managing. cgroup: cgroups_rs::Cgroup, + + command_sender: mpsc::Sender<(CgroupCommand, oneshot::Sender)>, } /// Read memory.events for the desired event type. @@ -235,9 +224,11 @@ impl CgroupWatcher { #[tracing::instrument(skip_all, fields(%name))] pub fn new( name: String, - // A channel on which to send upscale requests - upscale_requester: mpsc::Sender<()>, - ) -> anyhow::Result<(Self, impl Stream>)> { + ) -> anyhow::Result<( + Self, + CgroupWatchController, + impl Stream>, + )> { // TODO: clarify exactly why we need v2 // Make sure cgroups v2 (aka unified) are supported if !is_cgroup2_unified_mode() { @@ -284,275 +275,481 @@ impl CgroupWatcher { // running in the cgroup before that caused it to be non-zero. MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel); + let (command_tx, command_rx) = mpsc::channel(1); + Ok(( Self { - cgroup, - upscale_requester, - last_upscale_seqnum: AtomicU64::new(0), config: Default::default(), + cgroup, + command_sender: command_tx, + }, + CgroupWatchController { + commands: command_rx, }, memory_events, )) } + async fn do_command(&self, cmd: CgroupCommand) -> anyhow::Result { + let (tx_result, rx_result) = oneshot::channel(); + + self.command_sender + .send((cmd, tx_result)) + .await + .map_err(|_| anyhow!("cgroup command channel receiver dropped")) + .context("failed to send internal cgroup command")?; + + rx_result + .await + .context("failed to receive internal cgroup command response") + } + + pub async fn unset_memory_high(&self) -> anyhow::Result<()> { + match self.do_command(CgroupCommand::UnsetMemoryHigh).await? { + CgroupCommandResult::Failure { .. } => { + unreachable!("UnsetMemoryHigh command should never return graceful failure") + } + CgroupCommandResult::Success { message } => { + info!(status = message, "cgroup unset_memory_high successful"); + Ok(()) + } + } + } + + pub async fn set_initial_memory_high(&self, mem_size: MemorySize) -> anyhow::Result<()> { + match self + .do_command(CgroupCommand::SetInitialMemoryHigh(mem_size)) + .await? + { + CgroupCommandResult::Failure { .. } => { + unreachable!("SetInitialMemoryHigh command should never return graceful failure") + } + CgroupCommandResult::Success { message } => { + info!( + status = message, + "cgroup set_initial_memory_high successful" + ); + Ok(()) + } + } + } + + pub async fn downscale( + &self, + target_mem_size: MemorySize, + ) -> anyhow::Result> { + self.do_command(CgroupCommand::Downscale(Sequenced::new(target_mem_size))) + .await + .map(|res| match res { + CgroupCommandResult::Failure { message } => Err(message), + CgroupCommandResult::Success { message } => Ok(message), + }) + } + + pub async fn upscale(&self, mem_size: MemorySize) -> anyhow::Result<()> { + match self + .do_command(CgroupCommand::Upscale(Sequenced::new(mem_size))) + .await? + { + CgroupCommandResult::Failure { .. } => { + unreachable!("Upscale command should never return graceful failure") + } + CgroupCommandResult::Success { message } => { + info!(status = message, "cgroup upscale successful"); + Ok(()) + } + } + } +} + +pub struct CgroupWatchController { + commands: mpsc::Receiver<(CgroupCommand, oneshot::Sender)>, +} + +#[derive(Debug, Copy, Clone)] +pub struct MemorySize { + pub bytes: u64, +} + +#[derive(Debug, Copy, Clone)] +enum CgroupCommand { + UnsetMemoryHigh, + SetInitialMemoryHigh(MemorySize), + Downscale(Sequenced), + Upscale(Sequenced), +} + +enum CgroupCommandResult { + Success { message: String }, + Failure { message: String }, +} + +pin_project! { + /// Object storing the state inside of [`CgroupWatcher::watch`] + #[project = CgroupWatcherStateProjected] + struct CgroupWatcherState { + // If not `None`, the time at which we last increased `memory.high` in order to reduce the + // risk of throttling while waiting for upscaling. + // + // We're not allowed to increase `memory.high` more often than + // `Config.memory_high_increase_every`. + last_memory_high_increase_at: Option, + + // If not `None`, the time at which we last froze the cgroup. We're not allowed to freeze + // the cgroup more often than `Config.do_not_freeze_more_often_than`. + last_frozen_at: Option, + + // Timer representing when we must unfreeze the cgroup, if we believe it's currently frozen + // + // This gets set on memory.high events when `wait_to_freeze` is elapsed, and is never more + // than `Config.max_upscale_wait`. + #[pin] + must_unfreeze_at: Option, + + // True if we've requested upscaling that hasn't yet happened + waiting_on_upscale: bool, + + last_upscale_seqnum: Option, + } +} + +// `CgroupWatcher::watch` and supporting methods: +impl CgroupWatcher { /// The entrypoint for the `CgroupWatcher`. #[tracing::instrument(skip_all)] pub async fn watch( &self, - // These are ~dependency injected~ (fancy, I know) because this function - // should never return. - // -> therefore: when we tokio::spawn it, we don't await the JoinHandle. - // -> therefore: if we want to stick it in an Arc so many threads can access - // it, methods can never take mutable access. - // - note: we use the Arc strategy so that a) we can call this function - // right here and b) the runner can call the set/get_memory methods - // -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self, - // we just pass them in here instead of holding them in fields, as that - // would require this method to take &mut self. - mut upscales: mpsc::Receiver>, + mut controller: CgroupWatchController, + upscale_requester: mpsc::Sender<()>, events: E, ) -> anyhow::Result<()> where E: Stream>, { - let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO)); - let mut last_memory_high_increase_at: Option = None; let mut events = pin!(events); - // Are we waiting to be upscaled? Could be true if we request upscale due - // to a memory.high event and it does not arrive in time. - let mut waiting_on_upscale = false; + let state = pin!(CgroupWatcherState { + last_memory_high_increase_at: None, + last_frozen_at: None, + must_unfreeze_at: None, + waiting_on_upscale: false, + last_upscale_seqnum: None, + }); + let mut state = state.project(); loop { tokio::select! { - upscale = upscales.recv() => { - let Sequenced { seqnum, data } = upscale - .context("failed to listen on upscale notification channel")?; - waiting_on_upscale = false; - last_memory_high_increase_at = None; - self.last_upscale_seqnum.store(seqnum, Ordering::Release); - info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale"); - } - event = events.next() => { - let Some(Sequenced { seqnum, .. }) = event else { - bail!("failed to listen for memory.high events") - }; - // The memory.high came before our last upscale, so we consider - // it resolved - if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum { - info!( - "received memory.high event, but it came before our last upscale -> ignoring it" - ); - continue; - } - - // The memory.high came after our latest upscale. We don't - // want to do anything yet, so peek the next event in hopes - // that it's an upscale. - if let Some(upscale_num) = self - .upscaled(&mut upscales) - .context("failed to check if we were upscaled")? - { - if upscale_num > seqnum { - info!( - "received memory.high event, but it came before our last upscale -> ignoring it" - ); - continue; - } - } - - // If it's been long enough since we last froze, freeze the - // cgroup and request upscale - if wait_to_freeze.is_elapsed() { - info!("received memory.high event -> requesting upscale"); - waiting_on_upscale = self - .handle_memory_high_event(&mut upscales) - .await - .context("failed to handle upscale")?; - wait_to_freeze - .as_mut() - .reset(Instant::now() + self.config.do_not_freeze_more_often_than); - continue; - } - - // Ok, we can't freeze, just request upscale - if !waiting_on_upscale { - info!("received memory.high event, but too soon to refreeze -> requesting upscale"); - - // Make check to make sure we haven't been upscaled in the - // meantine (can happen if the agent independently decides - // to upscale us again) - if self - .upscaled(&mut upscales) - .context("failed to check if we were upscaled")? - .is_some() - { - info!("no need to request upscaling because we got upscaled"); - continue; - } - self.upscale_requester - .send(()) - .await - .context("failed to request upscale")?; - waiting_on_upscale = true; + // We want the select! to be biased so that we always unfreeze if time's up, rather + // than e.g. spinning on incoming commands or memory.high events. + biased; + + // convert &mut Pin<&mut Option> (needed so we don't move out of must_unfreeze_at) + // → Pin<&mut Option> + // → Option> (so we can unwrap) + // → Pin<&mut Sleep> (so we get a Future out of it) + _ = state.must_unfreeze_at.as_mut().as_pin_mut().unwrap(), if state.must_unfreeze_at.is_some() => { + info!("cgroup freeze limit expired without getting upscaled, thawing cgroup"); + self.thaw()?; + state.must_unfreeze_at.set(None); // No longer need to unfreeze + }, + + // If the `Runner` has issued a command, then we should process that. + command_opt = controller.commands.recv() => { + let (command, result_sender) = command_opt.ok_or_else(|| anyhow!("commands event stream closed"))?; + if result_sender.is_closed() { + warn!(?command, "skipping command because result sender is closed"); continue; } - // Shoot, we can't freeze or and we're still waiting on upscale, - // increase memory.high to reduce throttling - let can_increase_memory_high = match last_memory_high_increase_at { - None => true, - Some(t) => t.elapsed() > self.config.memory_high_increase_every, + let (ty, command_result) = match command { + CgroupCommand::UnsetMemoryHigh => { + ("'unset memory.high'", self.handle_unset_memory_high(&mut state)?) + }, + CgroupCommand::SetInitialMemoryHigh(mem_size) => { + ("'set initial memory.high'", self.handle_set_initial_memory_high(&mut state, mem_size)?) + }, + CgroupCommand::Upscale(Sequenced { seqnum, data: resources }) => { + ("upscale", self.handle_upscale_command(&mut state, seqnum, resources)?) + }, + CgroupCommand::Downscale(Sequenced { seqnum, data: resources }) => { + ("downscale", self.handle_downscale_command(&mut state, seqnum, resources)?) + }, }; - if can_increase_memory_high { - info!( - "received memory.high event, \ - but too soon to refreeze and already requested upscale \ - -> increasing memory.high" - ); - - // Make check to make sure we haven't been upscaled in the - // meantine (can happen if the agent independently decides - // to upscale us again) - if self - .upscaled(&mut upscales) - .context("failed to check if we were upscaled")? - .is_some() - { - info!("no need to increase memory.high because got upscaled"); - continue; - } - - // Request upscale anyways (the agent will handle deduplicating - // requests) - self.upscale_requester - .send(()) - .await - .context("failed to request upscale")?; - - let memory_high = - self.get_memory_high_bytes().context("failed to get memory.high")?; - let new_high = memory_high + self.config.memory_high_increase_by_bytes; - info!( - current_high_bytes = memory_high, - new_high_bytes = new_high, - "updating memory.high" - ); - self.set_memory_high_bytes(new_high) - .context("failed to set memory.high")?; - last_memory_high_increase_at = Some(Instant::now()); - continue; + + if let Err(_) = result_sender.send(command_result) { + error!("Failed to send {ty} command result for cgroup"); } + }, - info!("received memory.high event, but can't do anything"); - } + // Got a memory.high event, need to decide what to do + event = events.next() => { + let event = event.ok_or_else(|| anyhow!("memory.high event stream closed"))?; + self.handle_memory_high_event(&mut state, event, &upscale_requester).await?; + }, }; } } - /// Handle a `memory.high`, returning whether we are still waiting on upscale - /// by the time the function returns. - /// - /// The general plan for handling a `memory.high` event is as follows: - /// 1. Freeze the cgroup - /// 2. Start a timer for `self.config.max_upscale_wait` - /// 3. Request upscale - /// 4. After the timer elapses or we receive upscale, thaw the cgroup. - /// 5. Return whether or not we are still waiting for upscale. If we are, - /// we'll increase the cgroups memory.high to avoid getting oom killed - #[tracing::instrument(skip_all)] - async fn handle_memory_high_event( + fn handle_unset_memory_high( &self, - upscales: &mut mpsc::Receiver>, - ) -> anyhow::Result { - // Immediately freeze the cgroup before doing anything else. - info!("received memory.high event -> freezing cgroup"); - self.freeze().context("failed to freeze cgroup")?; - - // We'll use this for logging durations - let start_time = Instant::now(); - - // Await the upscale until we have to unfreeze - let timed = - tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales)); - - // Request the upscale - info!( - wait = ?self.config.max_upscale_wait, - "sending request for immediate upscaling", - ); - self.upscale_requester - .send(()) - .await - .context("failed to request upscale")?; - - let waiting_on_upscale = match timed.await { - Ok(Ok(())) => { - info!(elapsed = ?start_time.elapsed(), "received upscale in time"); - false - } - // **important**: unfreeze the cgroup before ?-reporting the error - Ok(Err(e)) => { - info!("error waiting for upscale -> thawing cgroup"); - self.thaw() - .context("failed to thaw cgroup after errored waiting for upscale")?; - Err(e.context("failed to await upscale"))? + _state: &mut CgroupWatcherStateProjected, + ) -> anyhow::Result { + // We don't *really* need to fetch memory.high here, but it's nice to do in order to + // improve the quality of the logs, and it should be minimally expensive. + let message = match self + .get_memory_high() + .context("failed to get memory.high")? + { + MaxValue::Max => { + let msg = "no need to update memory.high (currently set to 'max')"; + info!("{msg}"); + msg.to_owned() } - Err(_) => { - info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale"); - true + MaxValue::Value(current_memory_high_bytes) => { + info!(current_memory_high_bytes, "updating memory.high to 'max'"); + self.set_memory_high(MaxValue::Max) + .context("failed to set memory.high")?; + "memory.high set to 'max'".to_owned() } }; + Ok(CgroupCommandResult::Success { message }) + } - info!("thawing cgroup"); - self.thaw().context("failed to thaw cgroup")?; + fn handle_set_initial_memory_high( + &self, + _state: &mut CgroupWatcherStateProjected, + mem_size: MemorySize, + ) -> anyhow::Result { + let new_memory_high_bytes = self.config.calculate_memory_high_value(mem_size.bytes); + + match self + .get_memory_high() + .context("failed to get memory.high")? + { + MaxValue::Max => info!( + new_memory_high_bytes, + "updating memory.high (currently set to 'max')" + ), + MaxValue::Value(current_memory_high_bytes) => info!( + current_memory_high_bytes, + new_memory_high_bytes, "updating memory.high" + ), + } - Ok(waiting_on_upscale) + self.set_memory_high(MaxValue::Value(new_memory_high_bytes as i64)) + .context("failed to set memory.high")?; + Ok(CgroupCommandResult::Success { + message: format!( + "set cgroup memory.high to {} MiB", + bytes_to_mebibytes(new_memory_high_bytes), + ), + }) } - /// Checks whether we were just upscaled, returning the upscale's sequence - /// number if so. - #[tracing::instrument(skip_all)] - fn upscaled( + fn handle_upscale_command( &self, - upscales: &mut mpsc::Receiver>, - ) -> anyhow::Result> { - let Sequenced { seqnum, data } = match upscales.try_recv() { - Ok(upscale) => upscale, - Err(TryRecvError::Empty) => return Ok(None), - Err(TryRecvError::Disconnected) => { - bail!("upscale notification channel was disconnected") - } - }; + state: &mut CgroupWatcherStateProjected<'_>, + seqnum: u64, + mem_size: MemorySize, + ) -> anyhow::Result { + info!(seqnum, ?mem_size, "received upscale command"); + + // On upscaling, we want to set memory.high to the appropriate value and reset any other + // temporary conditions that may have accumulated while waiting for upscale. + + *state.waiting_on_upscale = false; + *state.last_upscale_seqnum = Some(seqnum); + + if self + .is_frozen() + .context("failed to check if cgroup is frozen")? + { + info!("thawing cgroup"); + self.thaw()?; + state.must_unfreeze_at.set(None); + } + + let new_memory_high_bytes = self.config.calculate_memory_high_value(mem_size.bytes); + + match self + .get_memory_high() + .context("failed to get memory.high")? + { + MaxValue::Max => info!( + new_memory_high_bytes, + "updating memory.high (currently set to 'max')" + ), + MaxValue::Value(current_memory_high_bytes) => info!( + current_memory_high_bytes, + new_memory_high_bytes, "updating memory.high" + ), + } + + self.set_memory_high(MaxValue::Value(new_memory_high_bytes as i64)) + .context("failed to set memory.high")?; - // Make sure to update the last upscale sequence number - self.last_upscale_seqnum.store(seqnum, Ordering::Release); - info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale"); - Ok(Some(seqnum)) + Ok(CgroupCommandResult::Success { + message: format!( + "set cgroup memory.high to {} MiB", + bytes_to_mebibytes(new_memory_high_bytes) + ), + }) } - /// Await an upscale event, discarding any `memory.high` events received in - /// the process. - /// - /// This is used in `handle_memory_high_event`, where we need to listen - /// for upscales in particular so we know if we can thaw the cgroup early. - #[tracing::instrument(skip_all)] - async fn await_upscale( + fn handle_downscale_command( + &self, + _state: &mut CgroupWatcherStateProjected<'_>, + seqnum: u64, + mem_size: MemorySize, + ) -> anyhow::Result { + info!(seqnum, ?mem_size, "received downscale command"); + + // On downscaling, we want to set memory.high, but only if the current + // memory usage is sufficiently below the target value. + + let new_memory_high_bytes = self.config.calculate_memory_high_value(mem_size.bytes); + let current_memory_usage = self + .current_memory_usage() + .context("failed to fetch cgroup memory usage")?; + + if new_memory_high_bytes < current_memory_usage + self.config.memory_high_buffer_bytes { + info!( + new_memory_high_bytes, + current_memory_usage, + buffer_bytes = self.config.memory_high_buffer_bytes, + "cgroup rejecting downscale because calculated memory.high is not sufficiently less than current usage", + ); + return Ok(CgroupCommandResult::Failure { + message: format!( + "calculated memory.high too low: {} MiB (new high) < {} (current usage) + {} (buffer)", + bytes_to_mebibytes(new_memory_high_bytes), + bytes_to_mebibytes(current_memory_usage), + bytes_to_mebibytes(self.config.memory_high_buffer_bytes), + ), + }); + } + + // Ok, memory usage is low enough, let's decrease memory.high: + match self + .get_memory_high() + .context("failed to get memory.high")? + { + MaxValue::Max => info!( + new_memory_high_bytes, + "updating memory.high (currently set to 'max')" + ), + MaxValue::Value(current_memory_high_bytes) => info!( + current_memory_high_bytes, + new_memory_high_bytes, "updating memory.high" + ), + } + + self.set_memory_high(MaxValue::Value(new_memory_high_bytes as i64)) + .context("failed to set memory.high")?; + Ok(CgroupCommandResult::Success { + message: format!( + "set cgroup memory.high to {} MiB", + bytes_to_mebibytes(new_memory_high_bytes), + ), + }) + } + + async fn handle_memory_high_event( &self, - upscales: &mut mpsc::Receiver>, + state: &mut CgroupWatcherStateProjected<'_>, + event: Sequenced, + upscale_requester: &mpsc::Sender<()>, ) -> anyhow::Result<()> { - let Sequenced { seqnum, .. } = upscales - .recv() - .await - .context("error listening for upscales")?; + // The memory.high came before our last upscale, so we consider + // it resolved + if *state.last_upscale_seqnum > Some(event.seqnum) { + info!( + seqnum = event.seqnum, + last_upscale_seqnum = state + .last_upscale_seqnum + .expect("None should not be greater than Some"), + "ignoring memory.high event because it happened before before last upscale", + ); + return Ok(()); + } + + // Fetch the current time at the start, so that subsequent delays + // are more likely to under-estimate than over-estimate, which + // should help with reliability if the system is overloaded. + let now = Instant::now(); + + // The memory.high came after our latest upscale, now we need to + // decide what to do. + // + // If it's been long enough since we last froze, freeze the + // cgroup and request upscale + let long_enough_since_last_freeze = state + .last_frozen_at + .map(|t| now > t + self.config.do_not_freeze_more_often_than) + .unwrap_or(true); + if long_enough_since_last_freeze { + info!("received memory.high event, freezing cgroup and forwarding upscale request"); + + self.freeze().context("failed to freeze cgroup")?; + state.must_unfreeze_at.set(Some(tokio::time::sleep_until( + now + self.config.max_upscale_wait, + ))); + + Self::request_upscale(upscale_requester).await?; + *state.waiting_on_upscale = true; + return Ok(()); + } + + // If we're already waiting on upscaling, then increase + // memory.high (if able) to avoid throttling. + // + // In either case, we'll re-request upscaling afterwards, because + // our `Runner::run` (and downstream, the autoscaler-agent) both + // deduplicate incoming upscaling requests. + let can_increase_memory_high = state + .last_memory_high_increase_at + .map(|t| now > t + self.config.memory_high_increase_every) + .unwrap_or(true); + if *state.waiting_on_upscale && can_increase_memory_high { + info!("received memory.high event but too soon to refreeze, so increasing memory.high"); + + match self + .get_memory_high() + .context("failed to get memory.high")? + { + MaxValue::Max => { + warn!("memory.high is already set to 'max', no further increases possible") + } + MaxValue::Value(current_memory_high_bytes) => { + let new_memory_high_bytes = current_memory_high_bytes + + self.config.memory_high_increase_by_bytes as i64; + info!( + current_memory_high_bytes, + new_memory_high_bytes, "updating memory.high" + ); + + self.set_memory_high(MaxValue::Value(new_memory_high_bytes)) + .context("failed to set memory.high")?; + *state.last_memory_high_increase_at = Some(now); + } + } + } else { + info!("received memory.high event, but too soon to refreeze or bump memory.high"); + } - self.last_upscale_seqnum.store(seqnum, Ordering::Release); + Self::request_upscale(upscale_requester).await?; + *state.waiting_on_upscale = true; Ok(()) } - /// Get the cgroup's name. - pub fn path(&self) -> &str { - self.cgroup.path() + // TODO: make this non-async, using something like `tokio::sync::broadcast`, + // because it doesn't really matter *how many* times we request upscaling, just + // that we've done it at some point. + async fn request_upscale(upscale_requester: &mpsc::Sender<()>) -> anyhow::Result<()> { + upscale_requester + .send(()) + .await + .context("failed to request upscale") } } @@ -572,8 +769,18 @@ impl CgroupWatcher { } } + fn is_frozen(&self) -> anyhow::Result { + use cgroups_rs::freezer::FreezerState; + + let state = self.freezer()?.state()?; + Ok(matches!( + state, + FreezerState::Freezing | FreezerState::Frozen + )) + } + /// Attempt to freeze the cgroup. - pub fn freeze(&self) -> anyhow::Result<()> { + fn freeze(&self) -> anyhow::Result<()> { self.freezer() .context("failed to get freezer subsystem")? .freeze() @@ -581,7 +788,7 @@ impl CgroupWatcher { } /// Attempt to thaw the cgroup. - pub fn thaw(&self) -> anyhow::Result<()> { + fn thaw(&self) -> anyhow::Result<()> { self.freezer() .context("failed to get freezer subsystem")? .thaw() @@ -607,7 +814,7 @@ impl CgroupWatcher { } /// Get cgroup current memory usage. - pub fn current_memory_usage(&self) -> anyhow::Result { + fn current_memory_usage(&self) -> anyhow::Result { Ok(self .memory() .context("failed to get memory subsystem")? @@ -616,16 +823,7 @@ impl CgroupWatcher { } /// Set cgroup memory.high threshold. - pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> { - self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)) - } - - /// Set the cgroup's memory.high to 'max', disabling it. - pub fn unset_memory_high(&self) -> anyhow::Result<()> { - self.set_memory_high_internal(MaxValue::Max) - } - - fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> { + fn set_memory_high(&self, value: MaxValue) -> anyhow::Result<()> { self.memory() .context("failed to get memory subsystem")? .set_mem(cgroups_rs::memory::SetMemory { @@ -638,17 +836,13 @@ impl CgroupWatcher { } /// Get memory.high threshold. - pub fn get_memory_high_bytes(&self) -> anyhow::Result { + fn get_memory_high(&self) -> anyhow::Result { let high = self .memory() .context("failed to get memory subsystem while getting memory statistics")? .get_mem() .map(|mem| mem.high) .context("failed to get memory statistics from subsystem")?; - match high { - Some(MaxValue::Max) => Ok(i64::MAX as u64), - Some(MaxValue::Value(high)) => Ok(high as u64), - None => anyhow::bail!("failed to read memory.high from memory subsystem"), - } + high.ok_or_else(|| anyhow!("failed to read memory.high from memory subsystem")) } } diff --git a/libs/vm_monitor/src/dispatcher.rs b/libs/vm_monitor/src/dispatcher.rs index 109a68fff196..e5f3e02873d5 100644 --- a/libs/vm_monitor/src/dispatcher.rs +++ b/libs/vm_monitor/src/dispatcher.rs @@ -15,9 +15,8 @@ use futures::{ use tokio::sync::mpsc; use tracing::info; -use crate::cgroup::Sequenced; use crate::protocol::{ - OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, Resources, PROTOCOL_MAX_VERSION, + OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION, }; @@ -36,9 +35,6 @@ pub struct Dispatcher { /// We send messages to the agent through `sink` sink: SplitSink, - /// Used to notify the cgroup when we are upscaled. - pub(crate) notify_upscale_events: mpsc::Sender>, - /// When the cgroup requests upscale it will send on this channel. In response /// we send an `UpscaleRequst` to the agent. pub(crate) request_upscale_events: mpsc::Receiver<()>, @@ -63,7 +59,6 @@ impl Dispatcher { /// is no compatible version. pub async fn new( stream: WebSocket, - notify_upscale_events: mpsc::Sender>, request_upscale_events: mpsc::Receiver<()>, ) -> anyhow::Result { let (mut sink, mut source) = stream.split(); @@ -119,22 +114,11 @@ impl Dispatcher { Ok(Self { sink, source, - notify_upscale_events, request_upscale_events, proto_version: highest_shared_version, }) } - /// Notify the cgroup manager that we have received upscale and wait for - /// the acknowledgement. - #[tracing::instrument(skip_all, fields(?resources))] - pub async fn notify_upscale(&self, resources: Sequenced) -> anyhow::Result<()> { - self.notify_upscale_events - .send(resources) - .await - .context("failed to send resources and oneshot sender across channel") - } - /// Send a message to the agent. /// /// Although this function is small, it has one major benefit: it is the only diff --git a/libs/vm_monitor/src/filecache.rs b/libs/vm_monitor/src/filecache.rs index 3a860500f120..4b0dc53979a6 100644 --- a/libs/vm_monitor/src/filecache.rs +++ b/libs/vm_monitor/src/filecache.rs @@ -2,18 +2,18 @@ use std::num::NonZeroU64; -use crate::MiB; +use crate::{bytes_to_mebibytes, MiB}; use anyhow::{anyhow, Context}; use tokio_postgres::{types::ToSql, Client, NoTls, Row}; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{error, info, warn}; /// Manages Postgres' file cache by keeping a connection open. #[derive(Debug)] pub struct FileCacheState { client: Client, conn_str: String, - pub(crate) config: FileCacheConfig, + config: FileCacheConfig, /// A token for cancelling spawned threads during shutdown. token: CancellationToken, @@ -24,7 +24,7 @@ pub struct FileCacheConfig { /// Whether the file cache is *actually* stored in memory (e.g. by writing to /// a tmpfs or shmem file). If true, the size of the file cache will be counted against the /// memory available for the cgroup. - pub(crate) in_memory: bool, + in_memory: bool, /// The size of the file cache, in terms of the size of the resource it consumes /// (currently: only memory) @@ -133,7 +133,10 @@ impl FileCacheConfig { } /// Calculate the desired size of the cache, given the total memory - pub fn calculate_cache_size(&self, total: u64) -> u64 { + /// + /// This isn't exposed publicly because the actual size of the cache may be limited by its + /// maximum size, so calculating the real usage requires querying the cache directly. + fn calculate_cache_size(&self, total: u64) -> u64 { // *Note*: all units are in bytes, until the very last line. let available = total.saturating_sub(self.min_remaining_after_cache.get()); if available == 0 { @@ -203,12 +206,17 @@ impl FileCacheState { Ok(client) } + /// Returns whether the config indicates the file cache is in memory + pub fn in_memory(&self) -> bool { + self.config.in_memory + } + /// Execute a query with a retry if necessary. /// /// If the initial query fails, we restart the database connection and attempt /// if again. #[tracing::instrument(skip_all, fields(%statement))] - pub async fn query_with_retry( + async fn query_with_retry( &mut self, statement: &str, params: &[&(dyn ToSql + Sync)], @@ -238,7 +246,7 @@ impl FileCacheState { } } - /// Get the current size of the file cache. + /// Get the current size of the file cache, in bytes #[tracing::instrument(skip_all)] pub async fn get_file_cache_size(&mut self) -> anyhow::Result { self.query_with_retry( @@ -258,10 +266,12 @@ impl FileCacheState { .context("failed to extract file cache size from query result") } - /// Attempt to set the file cache size, returning the size it was actually - /// set to. - #[tracing::instrument(skip_all, fields(%num_bytes))] - pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result { + /// Calculates the desired size of the cache, relative to the total size given and capped by + /// the cache's maximum size (requires querying). + #[tracing::instrument(skip(self))] + pub async fn calculate_cache_size(&mut self, total_bytes: u64) -> anyhow::Result { + let desired_bytes = self.config.calculate_cache_size(total_bytes); + let max_bytes = self // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes // means that the end result we get is in bytes. @@ -277,20 +287,21 @@ impl FileCacheState { .map(|bytes| bytes as u64) .context("failed to extract max file cache size from query result")?; - let max_mb = max_bytes / MiB; - let num_mb = u64::min(num_bytes, max_bytes) / MiB; + if desired_bytes > max_bytes { + warn!( + desired_mb = bytes_to_mebibytes(desired_bytes), + max_mb = bytes_to_mebibytes(max_bytes), + "desired file cache size capped by maximum size" + ); + } - let capped = if num_bytes > max_bytes { - " (capped by maximum size)" - } else { - "" - }; + Ok(desired_bytes.min(max_bytes)) + } - info!( - size = num_mb, - max = max_mb, - "updating file cache size {capped}", - ); + /// Set the file cache size to a value returned by `calculate_size` + #[tracing::instrument(skip_all, fields(num_bytes))] + pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result { + let num_mb = num_bytes / MiB; // note: even though the normal ways to get the cache size produce values with trailing "MB" // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs index b0ee5f0310a9..b2fd8793cb5a 100644 --- a/libs/vm_monitor/src/runner.rs +++ b/libs/vm_monitor/src/runner.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; -use crate::cgroup::{CgroupWatcher, Sequenced}; +use crate::cgroup::{self, CgroupWatcher}; use crate::dispatcher::Dispatcher; use crate::filecache::{FileCacheConfig, FileCacheState}; use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources}; @@ -89,10 +89,9 @@ impl Runner { // *NOTE*: the dispatcher and cgroup manager talk through these channels // so make sure they each get the correct half, nothing is droppped, etc. - let (notified_send, notified_recv) = mpsc::channel(1); let (requesting_send, requesting_recv) = mpsc::channel(1); - let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv) + let dispatcher = Dispatcher::new(ws, requesting_recv) .await .context("error creating new dispatcher")?; @@ -121,15 +120,8 @@ impl Runner { // now, and then set limits later. info!("initializing cgroup"); - let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send) - .context("failed to create cgroup manager")?; - - info!("temporarily unsetting memory.high"); - - // Temporarily un-set cgroup memory.high; see above. - cgroup - .unset_memory_high() - .context("failed to unset memory.high")?; + let (cgroup, controller, cgroup_event_stream) = + CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?; let cgroup = Arc::new(cgroup); @@ -137,9 +129,20 @@ impl Runner { spawn_with_cancel( token.clone(), |_| error!("cgroup watcher terminated"), - async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await }, + async move { + cgroup_clone + .watch(controller, requesting_send, cgroup_event_stream) + .await + }, ); + info!("temporarily unsetting memory.high"); + + cgroup + .unset_memory_high() + .await + .context("failed to unset memory.high")?; + state.cgroup = Some(cgroup); } @@ -165,7 +168,10 @@ impl Runner { .await .context("error getting file cache size")?; - let new_size = file_cache.config.calculate_cache_size(mem); + let new_size = file_cache + .calculate_cache_size(mem) + .await + .context("failed to calculate new file cache size")?; info!( initial = bytes_to_mebibytes(size), new = bytes_to_mebibytes(new_size), @@ -174,16 +180,13 @@ impl Runner { // note: even if size == new_size, we want to explicitly set it, just // to make sure that we have the permissions to do so - let actual_size = file_cache + file_cache .set_file_cache_size(new_size) .await .context("failed to set file cache size, possibly due to inadequate permissions")?; - if actual_size != new_size { - info!("file cache size actually got set to {actual_size}") - } // Mark the resources given to the file cache as reserved, but only if it's in memory. if !args.file_cache_on_disk { - file_cache_reserved_bytes = actual_size; + file_cache_reserved_bytes = new_size; } state.filecache = Some(file_cache); @@ -191,13 +194,10 @@ impl Runner { if let Some(cgroup) = &state.cgroup { let available = mem - file_cache_reserved_bytes; - let value = cgroup.config.calculate_memory_high_value(available); - - info!(value, "setting memory.high"); - cgroup - .set_memory_high_bytes(value) - .context("failed to set cgroup memory.high")?; + .set_initial_memory_high(cgroup::MemorySize { bytes: available }) + .await + .context("failed to set initial cgroup memory.high")?; } Ok(state) @@ -217,73 +217,45 @@ impl Runner { let requested_mem = target.mem; let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes); - let expected_file_cache_mem_usage = self - .filecache - .as_ref() - .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory)) - .unwrap_or(0); - let mut new_cgroup_mem_high = 0; - if let Some(cgroup) = &self.cgroup { - new_cgroup_mem_high = cgroup - .config - .calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage); - - let current = cgroup - .current_memory_usage() - .context("failed to fetch cgroup memory")?; - - if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes { - let status = format!( - "{}: {} MiB (new high) < {} (current usage) + {} (buffer)", - "calculated memory.high too low", - bytes_to_mebibytes(new_cgroup_mem_high), - bytes_to_mebibytes(current), - bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes) - ); + let (new_file_cache_size, file_cache_mem_reserved_bytes) = match &mut self.filecache { + None => (0, 0), + Some(fc) => { + let size = fc.calculate_cache_size(usable_system_memory).await?; + match fc.in_memory() { + true => (size, size), + false => (size, 0), + } + } + }; - info!(status, "discontinuing downscale"); + let mut status = vec![]; - return Ok((false, status)); + if let Some(cgroup) = &self.cgroup { + let available = usable_system_memory.saturating_sub(file_cache_mem_reserved_bytes); + + match cgroup + .downscale(cgroup::MemorySize { bytes: available }) + .await? + { + Ok(accepted_message) => status.push(accepted_message), + Err(denied_message) => { + info!(status = denied_message, "discontinuing downscale"); + return Ok((false, denied_message)); + } } } - // The downscaling has been approved. Downscale the file cache, then the cgroup. - let mut status = vec![]; - let mut file_cache_mem_usage = 0; + // The downscaling has been approved and actioned by the cgroup (if there is one), which is + // the only component that can reject it, so all that's left is to update the file cache. if let Some(file_cache) = &mut self.filecache { let actual_usage = file_cache - .set_file_cache_size(expected_file_cache_mem_usage) + .set_file_cache_size(new_file_cache_size) .await .context("failed to set file cache size")?; - if file_cache.config.in_memory { - file_cache_mem_usage = actual_usage; - } let message = format!( "set file cache size to {} MiB (in memory = {})", bytes_to_mebibytes(actual_usage), - file_cache.config.in_memory, - ); - info!("downscale: {message}"); - status.push(message); - } - - if let Some(cgroup) = &self.cgroup { - let available_memory = usable_system_memory - file_cache_mem_usage; - - if file_cache_mem_usage != expected_file_cache_mem_usage { - new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); - } - - // new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here - // since it is properly initialized in the previous cgroup if let block - cgroup - .set_memory_high_bytes(new_cgroup_mem_high) - .context("failed to set cgroup memory.high")?; - - let message = format!( - "set cgroup memory.high to {} MiB, of new max {} MiB", - bytes_to_mebibytes(new_cgroup_mem_high), - bytes_to_mebibytes(available_memory) + file_cache.in_memory(), ); info!("downscale: {message}"); status.push(message); @@ -308,42 +280,34 @@ impl Runner { // Get the file cache's expected contribution to the memory usage let mut file_cache_mem_usage = 0; if let Some(file_cache) = &mut self.filecache { - let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory); + let new_size = file_cache + .calculate_cache_size(usable_system_memory) + .await + .context("failed to calculate new file cache size")?; + info!( - target = bytes_to_mebibytes(expected_usage), + size = bytes_to_mebibytes(new_size), total = bytes_to_mebibytes(new_mem), "updating file cache size", ); - let actual_usage = file_cache - .set_file_cache_size(expected_usage) + let actual_size = file_cache + .set_file_cache_size(new_size) .await .context("failed to set file cache size")?; - if file_cache.config.in_memory { - file_cache_mem_usage = actual_usage; - } - - if actual_usage != expected_usage { - warn!( - "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib", - bytes_to_mebibytes(expected_usage), - bytes_to_mebibytes(actual_usage) - ) + if file_cache.in_memory() { + file_cache_mem_usage = actual_size; } } if let Some(cgroup) = &self.cgroup { - let available_memory = usable_system_memory - file_cache_mem_usage; - let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); - info!( - target = bytes_to_mebibytes(new_cgroup_mem_high), - total = bytes_to_mebibytes(new_mem), - name = cgroup.path(), - "updating cgroup memory.high", - ); + let available_memory = usable_system_memory.saturating_sub(file_cache_mem_usage); + cgroup - .set_memory_high_bytes(new_cgroup_mem_high) - .context("failed to set cgroup memory.high")?; + .upscale(cgroup::MemorySize { + bytes: available_memory, + }) + .await?; } Ok(()) @@ -361,10 +325,6 @@ impl Runner { self.handle_upscale(granted) .await .context("failed to handle upscale")?; - self.dispatcher - .notify_upscale(Sequenced::new(granted)) - .await - .context("failed to notify notify cgroup of upscale")?; Ok(Some(OutboundMsg::new( OutboundMsgKind::UpscaleConfirmation {}, id,