diff --git a/libs/vm_monitor/README.md b/libs/vm_monitor/README.md index 53cdecd9f38f..fdd943077dc4 100644 --- a/libs/vm_monitor/README.md +++ b/libs/vm_monitor/README.md @@ -27,8 +27,8 @@ and old one if it exists. * the filecache: a struct that allows communication with the Postgres file cache. On startup, we connect to the filecache and hold on to the connection for the entire monitor lifetime. -* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by -listening for `memory.high` events and setting its `memory.{high,max}` values. +* the cgroup watcher: the `CgroupWatcher` polls the `neon-postgres` cgroup's memory +usage and sends rolling aggregates to the runner. * the runner: the runner marries the filecache and cgroup watcher together, communicating with the agent throught the `Dispatcher`, and then calling filecache and cgroup watcher functions as needed to upscale and downscale diff --git a/libs/vm_monitor/src/cgroup.rs b/libs/vm_monitor/src/cgroup.rs index 15e972505e46..7160a42df256 100644 --- a/libs/vm_monitor/src/cgroup.rs +++ b/libs/vm_monitor/src/cgroup.rs @@ -1,161 +1,38 @@ -use std::{ - fmt::{Debug, Display}, - fs, - pin::pin, - sync::atomic::{AtomicU64, Ordering}, -}; +use std::fmt::{self, Debug, Formatter}; +use std::time::{Duration, Instant}; -use anyhow::{anyhow, bail, Context}; +use anyhow::{anyhow, Context}; use cgroups_rs::{ - freezer::FreezerController, - hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT}, + hierarchies::{self, is_cgroup2_unified_mode}, memory::MemController, - MaxValue, - Subsystem::{Freezer, Mem}, + Subsystem, }; -use inotify::{EventStream, Inotify, WatchMask}; -use tokio::sync::mpsc::{self, error::TryRecvError}; -use tokio::time::{Duration, Instant}; -use tokio_stream::{Stream, StreamExt}; +use tokio::sync::watch; use tracing::{info, warn}; -use crate::protocol::Resources; -use crate::MiB; - -/// Monotonically increasing counter of the number of memory.high events -/// the cgroup has experienced. -/// -/// We use this to determine if a modification to the `memory.events` file actually -/// changed the `high` field. If not, we don't care about the change. When we -/// read the file, we check the `high` field in the file against `MEMORY_EVENT_COUNT` -/// to see if it changed since last time. -pub static MEMORY_EVENT_COUNT: AtomicU64 = AtomicU64::new(0); - -/// Monotonically increasing counter that gives each cgroup event a unique id. -/// -/// This allows us to answer questions like "did this upscale arrive before this -/// memory.high?". This static is also used by the `Sequenced` type to "tag" values -/// with a sequence number. As such, prefer to used the `Sequenced` type rather -/// than this static directly. -static EVENT_SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0); - -/// A memory event type reported in memory.events. -#[derive(Debug, Eq, PartialEq, Copy, Clone)] -pub enum MemoryEvent { - Low, - High, - Max, - Oom, - OomKill, - OomGroupKill, -} - -impl MemoryEvent { - fn as_str(&self) -> &str { - match self { - MemoryEvent::Low => "low", - MemoryEvent::High => "high", - MemoryEvent::Max => "max", - MemoryEvent::Oom => "oom", - MemoryEvent::OomKill => "oom_kill", - MemoryEvent::OomGroupKill => "oom_group_kill", - } - } -} - -impl Display for MemoryEvent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.as_str()) - } -} - /// Configuration for a `CgroupWatcher` #[derive(Debug, Clone)] pub struct Config { - // The target difference between the total memory reserved for the cgroup - // and the value of the cgroup's memory.high. - // - // In other words, memory.high + oom_buffer_bytes will equal the total memory that the cgroup may - // use (equal to system memory, minus whatever's taken out for the file cache). - oom_buffer_bytes: u64, - - // The amount of memory, in bytes, below a proposed new value for - // memory.high that the cgroup's memory usage must be for us to downscale - // - // In other words, we can downscale only when: - // - // memory.current + memory_high_buffer_bytes < (proposed) memory.high - // - // TODO: there's some minor issues with this approach -- in particular, that we might have - // memory in use by the kernel's page cache that we're actually ok with getting rid of. - pub(crate) memory_high_buffer_bytes: u64, - - // The maximum duration, in milliseconds, that we're allowed to pause - // the cgroup for while waiting for the autoscaler-agent to upscale us - max_upscale_wait: Duration, - - // The required minimum time, in milliseconds, that we must wait before re-freezing - // the cgroup while waiting for the autoscaler-agent to upscale us. - do_not_freeze_more_often_than: Duration, - - // The amount of memory, in bytes, that we should periodically increase memory.high - // by while waiting for the autoscaler-agent to upscale us. - // - // This exists to avoid the excessive throttling that happens when a cgroup is above its - // memory.high for too long. See more here: - // https://github.com/neondatabase/autoscaling/issues/44#issuecomment-1522487217 - memory_high_increase_by_bytes: u64, - - // The period, in milliseconds, at which we should repeatedly increase the value - // of the cgroup's memory.high while we're waiting on upscaling and memory.high - // is still being hit. - // - // Technically speaking, this actually serves as a rate limit to moderate responding to - // memory.high events, but these are roughly equivalent if the process is still allocating - // memory. - memory_high_increase_every: Duration, -} + /// Interval at which we should be fetching memory statistics + memory_poll_interval: Duration, -impl Config { - /// Calculate the new value for the cgroups memory.high based on system memory - pub fn calculate_memory_high_value(&self, total_system_mem: u64) -> u64 { - total_system_mem.saturating_sub(self.oom_buffer_bytes) - } + /// The number of samples used in constructing aggregated memory statistics + memory_history_len: usize, + /// The number of most recent samples that will be periodically logged. + /// + /// Each sample is logged exactly once. Increasing this value means that recent samples will be + /// logged less frequently, and vice versa. + /// + /// For simplicity, this value must be greater than or equal to `memory_history_len`. + memory_history_log_interval: usize, } impl Default for Config { fn default() -> Self { Self { - oom_buffer_bytes: 100 * MiB, - memory_high_buffer_bytes: 100 * MiB, - // while waiting for upscale, don't freeze for more than 20ms every 1s - max_upscale_wait: Duration::from_millis(20), - do_not_freeze_more_often_than: Duration::from_millis(1000), - // while waiting for upscale, increase memory.high by 10MiB every 25ms - memory_high_increase_by_bytes: 10 * MiB, - memory_high_increase_every: Duration::from_millis(25), - } - } -} - -/// Used to represent data that is associated with a certain point in time, such -/// as an upscale request or memory.high event. -/// -/// Internally, creating a `Sequenced` uses a static atomic counter to obtain -/// a unique sequence number. Sequence numbers are monotonically increasing, -/// allowing us to answer questions like "did this upscale happen after this -/// memory.high event?" by comparing the sequence numbers of the two events. -#[derive(Debug, Clone)] -pub struct Sequenced { - seqnum: u64, - data: T, -} - -impl Sequenced { - pub fn new(data: T) -> Self { - Self { - seqnum: EVENT_SEQUENCE_NUMBER.fetch_add(1, Ordering::AcqRel), - data, + memory_poll_interval: Duration::from_millis(100), + memory_history_len: 5, // use 500ms of history for decision-making + memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy) } } } @@ -170,74 +47,14 @@ impl Sequenced { pub struct CgroupWatcher { pub config: Config, - /// The sequence number of the last upscale. - /// - /// If we receive a memory.high event that has a _lower_ sequence number than - /// `last_upscale_seqnum`, then we know it occured before the upscale, and we - /// can safely ignore it. - /// - /// Note: Like the `events` field, this doesn't _need_ interior mutability but we - /// use it anyways so that methods take `&self`, not `&mut self`. - last_upscale_seqnum: AtomicU64, - - /// A channel on which we send messages to request upscale from the dispatcher. - upscale_requester: mpsc::Sender<()>, - /// The actual cgroup we are watching and managing. cgroup: cgroups_rs::Cgroup, } -/// Read memory.events for the desired event type. -/// -/// `path` specifies the path to the desired `memory.events` file. -/// For more info, see the `memory.events` section of the [kernel docs] -/// -fn get_event_count(path: &str, event: MemoryEvent) -> anyhow::Result { - let contents = fs::read_to_string(path) - .with_context(|| format!("failed to read memory.events from {path}"))?; - - // Then contents of the file look like: - // low 42 - // high 101 - // ... - contents - .lines() - .filter_map(|s| s.split_once(' ')) - .find(|(e, _)| *e == event.as_str()) - .ok_or_else(|| anyhow!("failed to find entry for memory.{event} events in {path}")) - .and_then(|(_, count)| { - count - .parse::() - .with_context(|| format!("failed to parse memory.{event} as u64")) - }) -} - -/// Create an event stream that produces events whenever the file at the provided -/// path is modified. -fn create_file_watcher(path: &str) -> anyhow::Result> { - info!("creating file watcher for {path}"); - let inotify = Inotify::init().context("failed to initialize file watcher")?; - inotify - .watches() - .add(path, WatchMask::MODIFY) - .with_context(|| format!("failed to start watching {path}"))?; - inotify - // The inotify docs use [0u8; 1024] so we'll just copy them. We only need - // to store one event at a time - if the event gets written over, that's - // ok. We still see that there is an event. For more information, see: - // https://man7.org/linux/man-pages/man7/inotify.7.html - .into_event_stream([0u8; 1024]) - .context("failed to start inotify event stream") -} - impl CgroupWatcher { /// Create a new `CgroupWatcher`. #[tracing::instrument(skip_all, fields(%name))] - pub fn new( - name: String, - // A channel on which to send upscale requests - upscale_requester: mpsc::Sender<()>, - ) -> anyhow::Result<(Self, impl Stream>)> { + pub fn new(name: String) -> anyhow::Result { // TODO: clarify exactly why we need v2 // Make sure cgroups v2 (aka unified) are supported if !is_cgroup2_unified_mode() { @@ -245,410 +62,203 @@ impl CgroupWatcher { } let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name); - // Start monitoring the cgroup for memory events. In general, for - // cgroups v2 (aka unified), metrics are reported in files like - // > `/sys/fs/cgroup/{name}/{metric}` - // We are looking for `memory.high` events, which are stored in the - // file `memory.events`. For more info, see the `memory.events` section - // of https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files - let path = format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name); - let memory_events = create_file_watcher(&path) - .with_context(|| format!("failed to create event watcher for {path}"))? - // This would be nice with with .inspect_err followed by .ok - .filter_map(move |_| match get_event_count(&path, MemoryEvent::High) { - Ok(high) => Some(high), - Err(error) => { - // TODO: Might want to just panic here - warn!(?error, "failed to read high events count from {}", &path); - None - } - }) - // Only report the event if the memory.high count increased - .filter_map(|high| { - if MEMORY_EVENT_COUNT.fetch_max(high, Ordering::AcqRel) < high { - Some(high) - } else { - None - } - }) - .map(Sequenced::new); - - let initial_count = get_event_count( - &format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name), - MemoryEvent::High, - )?; - - info!(initial_count, "initial memory.high event count"); - - // Hard update `MEMORY_EVENT_COUNT` since there could have been processes - // running in the cgroup before that caused it to be non-zero. - MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel); - - Ok(( - Self { - cgroup, - upscale_requester, - last_upscale_seqnum: AtomicU64::new(0), - config: Default::default(), - }, - memory_events, - )) + Ok(Self { + cgroup, + config: Default::default(), + }) } /// The entrypoint for the `CgroupWatcher`. #[tracing::instrument(skip_all)] - pub async fn watch( + pub async fn watch( &self, - // These are ~dependency injected~ (fancy, I know) because this function - // should never return. - // -> therefore: when we tokio::spawn it, we don't await the JoinHandle. - // -> therefore: if we want to stick it in an Arc so many threads can access - // it, methods can never take mutable access. - // - note: we use the Arc strategy so that a) we can call this function - // right here and b) the runner can call the set/get_memory methods - // -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self, - // we just pass them in here instead of holding them in fields, as that - // would require this method to take &mut self. - mut upscales: mpsc::Receiver>, - events: E, - ) -> anyhow::Result<()> - where - E: Stream>, - { - let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO)); - let mut last_memory_high_increase_at: Option = None; - let mut events = pin!(events); - - // Are we waiting to be upscaled? Could be true if we request upscale due - // to a memory.high event and it does not arrive in time. - let mut waiting_on_upscale = false; - - loop { - tokio::select! { - upscale = upscales.recv() => { - let Sequenced { seqnum, data } = upscale - .context("failed to listen on upscale notification channel")?; - waiting_on_upscale = false; - last_memory_high_increase_at = None; - self.last_upscale_seqnum.store(seqnum, Ordering::Release); - info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale"); - } - event = events.next() => { - let Some(Sequenced { seqnum, .. }) = event else { - bail!("failed to listen for memory.high events") - }; - // The memory.high came before our last upscale, so we consider - // it resolved - if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum { - info!( - "received memory.high event, but it came before our last upscale -> ignoring it" - ); - continue; - } - - // The memory.high came after our latest upscale. We don't - // want to do anything yet, so peek the next event in hopes - // that it's an upscale. - if let Some(upscale_num) = self - .upscaled(&mut upscales) - .context("failed to check if we were upscaled")? - { - if upscale_num > seqnum { - info!( - "received memory.high event, but it came before our last upscale -> ignoring it" - ); - continue; - } - } - - // If it's been long enough since we last froze, freeze the - // cgroup and request upscale - if wait_to_freeze.is_elapsed() { - info!("received memory.high event -> requesting upscale"); - waiting_on_upscale = self - .handle_memory_high_event(&mut upscales) - .await - .context("failed to handle upscale")?; - wait_to_freeze - .as_mut() - .reset(Instant::now() + self.config.do_not_freeze_more_often_than); - continue; - } - - // Ok, we can't freeze, just request upscale - if !waiting_on_upscale { - info!("received memory.high event, but too soon to refreeze -> requesting upscale"); - - // Make check to make sure we haven't been upscaled in the - // meantine (can happen if the agent independently decides - // to upscale us again) - if self - .upscaled(&mut upscales) - .context("failed to check if we were upscaled")? - .is_some() - { - info!("no need to request upscaling because we got upscaled"); - continue; - } - self.upscale_requester - .send(()) - .await - .context("failed to request upscale")?; - waiting_on_upscale = true; - continue; - } - - // Shoot, we can't freeze or and we're still waiting on upscale, - // increase memory.high to reduce throttling - let can_increase_memory_high = match last_memory_high_increase_at { - None => true, - Some(t) => t.elapsed() > self.config.memory_high_increase_every, - }; - if can_increase_memory_high { - info!( - "received memory.high event, \ - but too soon to refreeze and already requested upscale \ - -> increasing memory.high" - ); - - // Make check to make sure we haven't been upscaled in the - // meantine (can happen if the agent independently decides - // to upscale us again) - if self - .upscaled(&mut upscales) - .context("failed to check if we were upscaled")? - .is_some() - { - info!("no need to increase memory.high because got upscaled"); - continue; - } - - // Request upscale anyways (the agent will handle deduplicating - // requests) - self.upscale_requester - .send(()) - .await - .context("failed to request upscale")?; - - let memory_high = - self.get_memory_high_bytes().context("failed to get memory.high")?; - let new_high = memory_high + self.config.memory_high_increase_by_bytes; - info!( - current_high_bytes = memory_high, - new_high_bytes = new_high, - "updating memory.high" - ); - self.set_memory_high_bytes(new_high) - .context("failed to set memory.high")?; - last_memory_high_increase_at = Some(Instant::now()); - continue; - } - - info!("received memory.high event, but can't do anything"); - } - }; - } - } + updates: watch::Sender<(Instant, MemoryHistory)>, + ) -> anyhow::Result<()> { + // this requirement makes the code a bit easier to work with; see the config for more. + assert!(self.config.memory_history_len <= self.config.memory_history_log_interval); - /// Handle a `memory.high`, returning whether we are still waiting on upscale - /// by the time the function returns. - /// - /// The general plan for handling a `memory.high` event is as follows: - /// 1. Freeze the cgroup - /// 2. Start a timer for `self.config.max_upscale_wait` - /// 3. Request upscale - /// 4. After the timer elapses or we receive upscale, thaw the cgroup. - /// 5. Return whether or not we are still waiting for upscale. If we are, - /// we'll increase the cgroups memory.high to avoid getting oom killed - #[tracing::instrument(skip_all)] - async fn handle_memory_high_event( - &self, - upscales: &mut mpsc::Receiver>, - ) -> anyhow::Result { - // Immediately freeze the cgroup before doing anything else. - info!("received memory.high event -> freezing cgroup"); - self.freeze().context("failed to freeze cgroup")?; - - // We'll use this for logging durations - let start_time = Instant::now(); - - // Await the upscale until we have to unfreeze - let timed = - tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales)); - - // Request the upscale - info!( - wait = ?self.config.max_upscale_wait, - "sending request for immediate upscaling", - ); - self.upscale_requester - .send(()) - .await - .context("failed to request upscale")?; - - let waiting_on_upscale = match timed.await { - Ok(Ok(())) => { - info!(elapsed = ?start_time.elapsed(), "received upscale in time"); - false - } - // **important**: unfreeze the cgroup before ?-reporting the error - Ok(Err(e)) => { - info!("error waiting for upscale -> thawing cgroup"); - self.thaw() - .context("failed to thaw cgroup after errored waiting for upscale")?; - Err(e.context("failed to await upscale"))? - } - Err(_) => { - info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale"); - true - } - }; + let mut ticker = tokio::time::interval(self.config.memory_poll_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0 - info!("thawing cgroup"); - self.thaw().context("failed to thaw cgroup")?; + let mem_controller = self.memory()?; - Ok(waiting_on_upscale) - } + // buffer for samples that will be logged. once full, it remains so. + let history_log_len = self.config.memory_history_log_interval; + let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len]; - /// Checks whether we were just upscaled, returning the upscale's sequence - /// number if so. - #[tracing::instrument(skip_all)] - fn upscaled( - &self, - upscales: &mut mpsc::Receiver>, - ) -> anyhow::Result> { - let Sequenced { seqnum, data } = match upscales.try_recv() { - Ok(upscale) => upscale, - Err(TryRecvError::Empty) => return Ok(None), - Err(TryRecvError::Disconnected) => { - bail!("upscale notification channel was disconnected") - } - }; + for t in 0_u64.. { + ticker.tick().await; - // Make sure to update the last upscale sequence number - self.last_upscale_seqnum.store(seqnum, Ordering::Release); - info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale"); - Ok(Some(seqnum)) - } + let now = Instant::now(); + let mem = Self::memory_usage(mem_controller); - /// Await an upscale event, discarding any `memory.high` events received in - /// the process. - /// - /// This is used in `handle_memory_high_event`, where we need to listen - /// for upscales in particular so we know if we can thaw the cgroup early. - #[tracing::instrument(skip_all)] - async fn await_upscale( - &self, - upscales: &mut mpsc::Receiver>, - ) -> anyhow::Result<()> { - let Sequenced { seqnum, .. } = upscales - .recv() - .await - .context("error listening for upscales")?; + let i = t as usize % history_log_len; + history_log_buf[i] = mem; - self.last_upscale_seqnum.store(seqnum, Ordering::Release); - Ok(()) - } + // We're taking *at most* memory_history_len values; we may be bounded by the total + // number of samples that have come in so far. + let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize; + // NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact + // that we just inserted a value there, so the end of the iterator will *include* the + // value at i, rather than stopping just short of it. + let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count); - /// Get the cgroup's name. - pub fn path(&self) -> &str { - self.cgroup.path() - } -} + let summary = MemoryHistory { + avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::() + / samples_count as u64, + samples_count, + samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32, + }; -// Methods for manipulating the actual cgroup -impl CgroupWatcher { - /// Get a handle on the freezer subsystem. - fn freezer(&self) -> anyhow::Result<&FreezerController> { - if let Some(Freezer(freezer)) = self - .cgroup - .subsystems() - .iter() - .find(|sub| matches!(sub, Freezer(_))) - { - Ok(freezer) - } else { - anyhow::bail!("could not find freezer subsystem") - } - } + // Log the current history if it's time to do so. Because `history_log_buf` has length + // equal to the logging interval, we can just log the entire buffer every time we set + // the last entry, which also means that for this log line, we can ignore that it's a + // ring buffer (because all the entries are in order of increasing time). + if i == history_log_len - 1 { + info!( + history = ?MemoryStatus::debug_slice(&history_log_buf), + summary = ?summary, + "Recent cgroup memory statistics history" + ); + } - /// Attempt to freeze the cgroup. - pub fn freeze(&self) -> anyhow::Result<()> { - self.freezer() - .context("failed to get freezer subsystem")? - .freeze() - .context("failed to freeze") - } + updates + .send((now, summary)) + .context("failed to send MemoryHistory")?; + } - /// Attempt to thaw the cgroup. - pub fn thaw(&self) -> anyhow::Result<()> { - self.freezer() - .context("failed to get freezer subsystem")? - .thaw() - .context("failed to thaw") + unreachable!() } /// Get a handle on the memory subsystem. - /// - /// Note: this method does not require `self.memory_update_lock` because - /// getting a handle to the subsystem does not access any of the files we - /// care about, such as memory.high and memory.events fn memory(&self) -> anyhow::Result<&MemController> { - if let Some(Mem(memory)) = self - .cgroup + self.cgroup .subsystems() .iter() - .find(|sub| matches!(sub, Mem(_))) - { - Ok(memory) - } else { - anyhow::bail!("could not find memory subsystem") - } + .find_map(|sub| match sub { + Subsystem::Mem(c) => Some(c), + _ => None, + }) + .ok_or_else(|| anyhow!("could not find memory subsystem")) } - /// Get cgroup current memory usage. - pub fn current_memory_usage(&self) -> anyhow::Result { - Ok(self - .memory() - .context("failed to get memory subsystem")? - .memory_stat() - .usage_in_bytes) + /// Given a handle on the memory subsystem, returns the current memory information + fn memory_usage(mem_controller: &MemController) -> MemoryStatus { + let stat = mem_controller.memory_stat().stat; + MemoryStatus { + non_reclaimable: stat.active_anon + stat.inactive_anon, + } } +} - /// Set cgroup memory.high threshold. - pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> { - self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)) - } +// Helper function for `CgroupWatcher::watch` +fn ring_buf_recent_values_iter( + buf: &[T], + last_value_idx: usize, + count: usize, +) -> impl '_ + Iterator { + // Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function + // easier (we only have to add `buf.len()` once, rather than a dynamic number of times). + assert!(count <= buf.len()); + + buf.iter() + // 'cycle' because the values could wrap around + .cycle() + // with 'cycle', this skip is more like 'offset', and functionally this is + // offsettting by 'last_value_idx - count (mod buf.len())', but we have to be + // careful to avoid underflow, so we pre-add buf.len(). + // The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive. + .skip((buf.len() + last_value_idx + 1 - count) % buf.len()) + .take(count) +} - /// Set the cgroup's memory.high to 'max', disabling it. - pub fn unset_memory_high(&self) -> anyhow::Result<()> { - self.set_memory_high_internal(MaxValue::Max) - } +/// Summary of recent memory usage +#[derive(Debug, Copy, Clone)] +pub struct MemoryHistory { + /// Rolling average of non-reclaimable memory usage samples over the last `history_period` + pub avg_non_reclaimable: u64, - fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> { - self.memory() - .context("failed to get memory subsystem")? - .set_mem(cgroups_rs::memory::SetMemory { - low: None, - high: Some(value), - min: None, - max: None, - }) - .map_err(anyhow::Error::from) + /// The number of samples used to construct this summary + pub samples_count: usize, + /// Total timespan between the first and last sample used for this summary + pub samples_span: Duration, +} + +#[derive(Debug, Copy, Clone)] +pub struct MemoryStatus { + non_reclaimable: u64, +} + +impl MemoryStatus { + fn zeroed() -> Self { + MemoryStatus { non_reclaimable: 0 } } - /// Get memory.high threshold. - pub fn get_memory_high_bytes(&self) -> anyhow::Result { - let high = self - .memory() - .context("failed to get memory subsystem while getting memory statistics")? - .get_mem() - .map(|mem| mem.high) - .context("failed to get memory statistics from subsystem")?; - match high { - Some(MaxValue::Max) => Ok(i64::MAX as u64), - Some(MaxValue::Value(high)) => Ok(high as u64), - None => anyhow::bail!("failed to read memory.high from memory subsystem"), + fn debug_slice(slice: &[Self]) -> impl '_ + Debug { + struct DS<'a>(&'a [MemoryStatus]); + + impl<'a> Debug for DS<'a> { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("[MemoryStatus]") + .field( + "non_reclaimable[..]", + &Fields(self.0, |stat: &MemoryStatus| { + BytesToGB(stat.non_reclaimable) + }), + ) + .finish() + } + } + + struct Fields<'a, F>(&'a [MemoryStatus], F); + + impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_list().entries(self.0.iter().map(&self.1)).finish() + } + } + + struct BytesToGB(u64); + + impl Debug for BytesToGB { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.write_fmt(format_args!( + "{:.3}Gi", + self.0 as f64 / (1_u64 << 30) as f64 + )) + } } + + DS(slice) + } +} + +#[cfg(test)] +mod tests { + #[test] + fn ring_buf_iter() { + let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + + let values = |offset, count| { + super::ring_buf_recent_values_iter(&buf, offset, count) + .copied() + .collect::>() + }; + + // Boundary conditions: start, end, and entire thing: + assert_eq!(values(0, 1), [0]); + assert_eq!(values(3, 4), [0, 1, 2, 3]); + assert_eq!(values(9, 4), [6, 7, 8, 9]); + assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + + // "normal" operation: no wraparound + assert_eq!(values(7, 4), [4, 5, 6, 7]); + + // wraparound: + assert_eq!(values(0, 4), [7, 8, 9, 0]); + assert_eq!(values(1, 4), [8, 9, 0, 1]); + assert_eq!(values(2, 4), [9, 0, 1, 2]); + assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]); } } diff --git a/libs/vm_monitor/src/dispatcher.rs b/libs/vm_monitor/src/dispatcher.rs index 109a68fff196..c76baf04e7a2 100644 --- a/libs/vm_monitor/src/dispatcher.rs +++ b/libs/vm_monitor/src/dispatcher.rs @@ -12,12 +12,10 @@ use futures::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; -use tokio::sync::mpsc; use tracing::info; -use crate::cgroup::Sequenced; use crate::protocol::{ - OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, Resources, PROTOCOL_MAX_VERSION, + OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION, }; @@ -36,13 +34,6 @@ pub struct Dispatcher { /// We send messages to the agent through `sink` sink: SplitSink, - /// Used to notify the cgroup when we are upscaled. - pub(crate) notify_upscale_events: mpsc::Sender>, - - /// When the cgroup requests upscale it will send on this channel. In response - /// we send an `UpscaleRequst` to the agent. - pub(crate) request_upscale_events: mpsc::Receiver<()>, - /// The protocol version we have agreed to use with the agent. This is negotiated /// during the creation of the dispatcher, and should be the highest shared protocol /// version. @@ -61,11 +52,7 @@ impl Dispatcher { /// 1. Wait for the agent to sent the range of protocols it supports. /// 2. Send a protocol version that works for us as well, or an error if there /// is no compatible version. - pub async fn new( - stream: WebSocket, - notify_upscale_events: mpsc::Sender>, - request_upscale_events: mpsc::Receiver<()>, - ) -> anyhow::Result { + pub async fn new(stream: WebSocket) -> anyhow::Result { let (mut sink, mut source) = stream.split(); // Figure out the highest protocol version we both support @@ -119,22 +106,10 @@ impl Dispatcher { Ok(Self { sink, source, - notify_upscale_events, - request_upscale_events, proto_version: highest_shared_version, }) } - /// Notify the cgroup manager that we have received upscale and wait for - /// the acknowledgement. - #[tracing::instrument(skip_all, fields(?resources))] - pub async fn notify_upscale(&self, resources: Sequenced) -> anyhow::Result<()> { - self.notify_upscale_events - .send(resources) - .await - .context("failed to send resources and oneshot sender across channel") - } - /// Send a message to the agent. /// /// Although this function is small, it has one major benefit: it is the only diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs index b0ee5f0310a9..a7a0995797fe 100644 --- a/libs/vm_monitor/src/runner.rs +++ b/libs/vm_monitor/src/runner.rs @@ -5,18 +5,16 @@ //! all functionality. use std::fmt::Debug; -use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::{bail, Context}; use axum::extract::ws::{Message, WebSocket}; use futures::StreamExt; -use tokio::sync::broadcast; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, watch}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; -use crate::cgroup::{CgroupWatcher, Sequenced}; +use crate::cgroup::{self, CgroupWatcher}; use crate::dispatcher::Dispatcher; use crate::filecache::{FileCacheConfig, FileCacheState}; use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources}; @@ -28,7 +26,7 @@ use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args pub struct Runner { config: Config, filecache: Option, - cgroup: Option>, + cgroup: Option, dispatcher: Dispatcher, /// We "mint" new message ids by incrementing this counter and taking the value. @@ -45,6 +43,14 @@ pub struct Runner { kill: broadcast::Receiver<()>, } +#[derive(Debug)] +struct CgroupState { + watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>, + /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale + /// requests. + threshold: u64, +} + /// Configuration for a `Runner` #[derive(Debug)] pub struct Config { @@ -62,16 +68,56 @@ pub struct Config { /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field /// should be removed once we have a better solution there. sys_buffer_bytes: u64, + + /// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in + /// other words, providing a ceiling for the highest value of the threshold by enforcing that + /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the + /// threshold. + /// + /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding + /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total + /// memory. + /// + /// The default value of `0.15` means that we *guarantee* sending upscale requests if the + /// cgroup is using more than 85% of total memory (even if we're *not* separately reserving + /// memory for the file cache). + cgroup_min_overhead_fraction: f64, + + cgroup_downscale_threshold_buffer_bytes: u64, } impl Default for Config { fn default() -> Self { Self { sys_buffer_bytes: 100 * MiB, + cgroup_min_overhead_fraction: 0.15, + cgroup_downscale_threshold_buffer_bytes: 100 * MiB, } } } +impl Config { + fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 { + // If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup, + // and thus be non-reclaimable, so we should allow for additional memory usage. + // + // If the file cache sits on disk, our desired stable system state is for it to be fully + // page cached (its contents should only be paged to/from disk in situations where we can't + // upscale fast enough). Page-cached memory is reclaimable, so we need to lower the + // threshold for non-reclaimable memory so we scale up *before* the kernel starts paging + // out the file cache. + let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size); + + // Even if we're not separately making room for the file cache (if it's in tmpfs), we still + // want our threshold to be met gracefully instead of letting postgres get OOM-killed. + // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory + // remaining above the threshold. + let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64; + + memory_remaining_for_cgroup.min(max_threshold) + } +} + impl Runner { /// Create a new monitor. #[tracing::instrument(skip_all, fields(?config, ?args))] @@ -87,12 +133,7 @@ impl Runner { "invalid monitor Config: sys_buffer_bytes cannot be 0" ); - // *NOTE*: the dispatcher and cgroup manager talk through these channels - // so make sure they each get the correct half, nothing is droppped, etc. - let (notified_send, notified_recv) = mpsc::channel(1); - let (requesting_send, requesting_recv) = mpsc::channel(1); - - let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv) + let dispatcher = Dispatcher::new(ws) .await .context("error creating new dispatcher")?; @@ -106,46 +147,10 @@ impl Runner { kill, }; - // If we have both the cgroup and file cache integrations enabled, it's possible for - // temporary failures to result in cgroup throttling (from memory.high), that in turn makes - // it near-impossible to connect to the file cache (because it times out). Unfortunately, - // we *do* still want to determine the file cache size before setting the cgroup's - // memory.high, so it's not as simple as just swapping the order. - // - // Instead, the resolution here is that on vm-monitor startup (note: happens on each - // connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we - // temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit - // of a hacky solution, but helps with reliability. - if let Some(name) = &args.cgroup { - // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state - // now, and then set limits later. - info!("initializing cgroup"); - - let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send) - .context("failed to create cgroup manager")?; - - info!("temporarily unsetting memory.high"); - - // Temporarily un-set cgroup memory.high; see above. - cgroup - .unset_memory_high() - .context("failed to unset memory.high")?; - - let cgroup = Arc::new(cgroup); - - let cgroup_clone = Arc::clone(&cgroup); - spawn_with_cancel( - token.clone(), - |_| error!("cgroup watcher terminated"), - async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await }, - ); - - state.cgroup = Some(cgroup); - } - - let mut file_cache_reserved_bytes = 0; let mem = get_total_system_memory(); + let mut file_cache_disk_size = 0; + // We need to process file cache initialization before cgroup initialization, so that the memory // allocated to the file cache is appropriately taken into account when we decide the cgroup's // memory limits. @@ -156,7 +161,7 @@ impl Runner { false => FileCacheConfig::default_in_memory(), }; - let mut file_cache = FileCacheState::new(connstr, config, token) + let mut file_cache = FileCacheState::new(connstr, config, token.clone()) .await .context("failed to create file cache")?; @@ -181,23 +186,40 @@ impl Runner { if actual_size != new_size { info!("file cache size actually got set to {actual_size}") } - // Mark the resources given to the file cache as reserved, but only if it's in memory. - if !args.file_cache_on_disk { - file_cache_reserved_bytes = actual_size; + + if args.file_cache_on_disk { + file_cache_disk_size = actual_size; } state.filecache = Some(file_cache); } - if let Some(cgroup) = &state.cgroup { - let available = mem - file_cache_reserved_bytes; - let value = cgroup.config.calculate_memory_high_value(available); + if let Some(name) = &args.cgroup { + // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state + // now, and then set limits later. + info!("initializing cgroup"); + + let cgroup = + CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?; + + let init_value = cgroup::MemoryHistory { + avg_non_reclaimable: 0, + samples_count: 0, + samples_span: Duration::ZERO, + }; + let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value)); + + spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move { + cgroup.watch(hist_tx).await + }); - info!(value, "setting memory.high"); + let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size); + info!(threshold, "set initial cgroup threshold",); - cgroup - .set_memory_high_bytes(value) - .context("failed to set cgroup memory.high")?; + state.cgroup = Some(CgroupState { + watcher: hist_rx, + threshold, + }); } Ok(state) @@ -217,28 +239,40 @@ impl Runner { let requested_mem = target.mem; let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes); - let expected_file_cache_mem_usage = self + let (expected_file_cache_size, expected_file_cache_disk_size) = self .filecache .as_ref() - .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory)) - .unwrap_or(0); - let mut new_cgroup_mem_high = 0; + .map(|file_cache| { + let size = file_cache.config.calculate_cache_size(usable_system_memory); + match file_cache.config.in_memory { + true => (size, 0), + false => (size, size), + } + }) + .unwrap_or((0, 0)); if let Some(cgroup) = &self.cgroup { - new_cgroup_mem_high = cgroup + let (last_time, last_history) = *cgroup.watcher.borrow(); + + // TODO: make the duration here configurable. + if last_time.elapsed() > Duration::from_secs(5) { + bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information"); + } else if last_history.samples_count <= 1 { + bail!("haven't received enough cgroup memory stats yet"); + } + + let new_threshold = self .config - .calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage); + .cgroup_threshold(usable_system_memory, expected_file_cache_disk_size); - let current = cgroup - .current_memory_usage() - .context("failed to fetch cgroup memory")?; + let current = last_history.avg_non_reclaimable; - if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes { + if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes { let status = format!( - "{}: {} MiB (new high) < {} (current usage) + {} (buffer)", - "calculated memory.high too low", - bytes_to_mebibytes(new_cgroup_mem_high), + "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)", + "calculated memory threshold too low", + bytes_to_mebibytes(new_threshold), bytes_to_mebibytes(current), - bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes) + bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes) ); info!(status, "discontinuing downscale"); @@ -249,14 +283,14 @@ impl Runner { // The downscaling has been approved. Downscale the file cache, then the cgroup. let mut status = vec![]; - let mut file_cache_mem_usage = 0; + let mut file_cache_disk_size = 0; if let Some(file_cache) = &mut self.filecache { let actual_usage = file_cache - .set_file_cache_size(expected_file_cache_mem_usage) + .set_file_cache_size(expected_file_cache_size) .await .context("failed to set file cache size")?; - if file_cache.config.in_memory { - file_cache_mem_usage = actual_usage; + if !file_cache.config.in_memory { + file_cache_disk_size = actual_usage; } let message = format!( "set file cache size to {} MiB (in memory = {})", @@ -267,24 +301,18 @@ impl Runner { status.push(message); } - if let Some(cgroup) = &self.cgroup { - let available_memory = usable_system_memory - file_cache_mem_usage; - - if file_cache_mem_usage != expected_file_cache_mem_usage { - new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); - } - - // new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here - // since it is properly initialized in the previous cgroup if let block - cgroup - .set_memory_high_bytes(new_cgroup_mem_high) - .context("failed to set cgroup memory.high")?; + if let Some(cgroup) = &mut self.cgroup { + let new_threshold = self + .config + .cgroup_threshold(usable_system_memory, file_cache_disk_size); let message = format!( - "set cgroup memory.high to {} MiB, of new max {} MiB", - bytes_to_mebibytes(new_cgroup_mem_high), - bytes_to_mebibytes(available_memory) + "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB", + bytes_to_mebibytes(cgroup.threshold), + bytes_to_mebibytes(new_threshold), + bytes_to_mebibytes(usable_system_memory) ); + cgroup.threshold = new_threshold; info!("downscale: {message}"); status.push(message); } @@ -305,8 +333,7 @@ impl Runner { let new_mem = resources.mem; let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes); - // Get the file cache's expected contribution to the memory usage - let mut file_cache_mem_usage = 0; + let mut file_cache_disk_size = 0; if let Some(file_cache) = &mut self.filecache { let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory); info!( @@ -319,8 +346,8 @@ impl Runner { .set_file_cache_size(expected_usage) .await .context("failed to set file cache size")?; - if file_cache.config.in_memory { - file_cache_mem_usage = actual_usage; + if !file_cache.config.in_memory { + file_cache_disk_size = actual_usage; } if actual_usage != expected_usage { @@ -332,18 +359,18 @@ impl Runner { } } - if let Some(cgroup) = &self.cgroup { - let available_memory = usable_system_memory - file_cache_mem_usage; - let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); + if let Some(cgroup) = &mut self.cgroup { + let new_threshold = self + .config + .cgroup_threshold(usable_system_memory, file_cache_disk_size); + info!( - target = bytes_to_mebibytes(new_cgroup_mem_high), - total = bytes_to_mebibytes(new_mem), - name = cgroup.path(), - "updating cgroup memory.high", + "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB", + bytes_to_mebibytes(cgroup.threshold), + bytes_to_mebibytes(new_threshold), + bytes_to_mebibytes(usable_system_memory) ); - cgroup - .set_memory_high_bytes(new_cgroup_mem_high) - .context("failed to set cgroup memory.high")?; + cgroup.threshold = new_threshold; } Ok(()) @@ -361,10 +388,6 @@ impl Runner { self.handle_upscale(granted) .await .context("failed to handle upscale")?; - self.dispatcher - .notify_upscale(Sequenced::new(granted)) - .await - .context("failed to notify notify cgroup of upscale")?; Ok(Some(OutboundMsg::new( OutboundMsgKind::UpscaleConfirmation {}, id, @@ -408,33 +431,53 @@ impl Runner { Err(e) => bail!("failed to receive kill signal: {e}") } } - // we need to propagate an upscale request - request = self.dispatcher.request_upscale_events.recv(), if self.cgroup.is_some() => { - if request.is_none() { - bail!("failed to listen for upscale event from cgroup") + + // New memory stats from the cgroup, *may* need to request upscaling, if we've + // exceeded the threshold + result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => { + result.context("failed to receive from cgroup memory stats watcher")?; + + let cgroup = self.cgroup.as_ref().unwrap(); + + let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow(); + + // If we haven't exceeded the threshold, then we're all ok + if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold { + continue; } - // If it's been less than 1 second since the last time we requested upscaling, - // ignore the event, to avoid spamming the agent (otherwise, this can happen - // ~1k times per second). + // Otherwise, we generally want upscaling. But, if it's been less than 1 second + // since the last time we requested upscaling, ignore the event, to avoid + // spamming the agent. if let Some(t) = self.last_upscale_request_at { let elapsed = t.elapsed(); if elapsed < Duration::from_secs(1) { - info!(elapsed_millis = elapsed.as_millis(), "cgroup asked for upscale but too soon to forward the request, ignoring"); + info!( + elapsed_millis = elapsed.as_millis(), + avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable), + threshold = bytes_to_mebibytes(cgroup.threshold), + "cgroup memory stats are high enough to upscale but too soon to forward the request, ignoring", + ); continue; } } self.last_upscale_request_at = Some(Instant::now()); - info!("cgroup asking for upscale; forwarding request"); + info!( + avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable), + threshold = bytes_to_mebibytes(cgroup.threshold), + "cgroup memory stats are high enough to upscale, requesting upscale", + ); + self.counter += 2; // Increment, preserving parity (i.e. keep the // counter odd). See the field comment for more. self.dispatcher .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter)) .await .context("failed to send message")?; - } + }, + // there is a message from the agent msg = self.dispatcher.source.next() => { if let Some(msg) = msg {