Skip to content

Commit

Permalink
Improve safekeepers eviction rate limiting (#8456)
Browse files Browse the repository at this point in the history
This commit tries to fix regular load spikes on staging, caused by too
many eviction and partial upload operations running at the same time.
Usually it was hapenning after restart, for partial backup the load was
delayed.
- Add a semaphore for evictions (2 permits by default)
- Rename `resident_since` to `evict_not_before` and smooth out the curve
by using random duration
- Use random duration in partial uploads as well

related to #6338
some discussion in
https://neondb.slack.com/archives/C033RQ5SPDH/p1720601531744029
  • Loading branch information
petuhovskiy authored Aug 2, 2024
1 parent 8c828c5 commit f3acfb2
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 49 deletions.
2 changes: 2 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod json_ctrl;
pub mod metrics;
pub mod patch_control_file;
pub mod pull_timeline;
pub mod rate_limit;
pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
Expand Down Expand Up @@ -53,6 +54,7 @@ pub mod defaults {
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;

// By default, our required residency before eviction is the same as the period that passes
// before uploading a partial segment, so that in normal operation the eviction can happen
Expand Down
49 changes: 49 additions & 0 deletions safekeeper/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::sync::Arc;

use rand::Rng;

use crate::metrics::MISC_OPERATION_SECONDS;

/// Global rate limiter for background tasks.
#[derive(Clone)]
pub struct RateLimiter {
partial_backup: Arc<tokio::sync::Semaphore>,
eviction: Arc<tokio::sync::Semaphore>,
}

impl RateLimiter {
/// Create a new rate limiter.
/// - `partial_backup_max`: maximum number of concurrent partial backups.
/// - `eviction_max`: maximum number of concurrent timeline evictions.
pub fn new(partial_backup_max: usize, eviction_max: usize) -> Self {
Self {
partial_backup: Arc::new(tokio::sync::Semaphore::new(partial_backup_max)),
eviction: Arc::new(tokio::sync::Semaphore::new(eviction_max)),
}
}

/// Get a permit for partial backup. This will block if the maximum number of concurrent
/// partial backups is reached.
pub async fn acquire_partial_backup(&self) -> tokio::sync::OwnedSemaphorePermit {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_permit_acquire"])
.start_timer();
self.partial_backup
.clone()
.acquire_owned()
.await
.expect("semaphore is closed")
}

/// Try to get a permit for timeline eviction. This will return None if the maximum number of
/// concurrent timeline evictions is reached.
pub fn try_acquire_eviction(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
self.eviction.clone().try_acquire_owned().ok()
}
}

/// Generate a random duration that is a fraction of the given duration.
pub fn rand_duration(duration: &std::time::Duration) -> std::time::Duration {
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
duration.mul_f64(randf64)
}
3 changes: 2 additions & 1 deletion safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use utils::{
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;

use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
Expand All @@ -36,7 +37,7 @@ use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};

use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
Expand Down
6 changes: 3 additions & 3 deletions safekeeper/src/timeline_eviction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use anyhow::Context;
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use std::time::Instant;
use tokio::{
fs::File,
io::{AsyncRead, AsyncWriteExt},
Expand All @@ -15,6 +14,7 @@ use utils::crashsafe::durable_rename;

use crate::{
metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
rate_limit::rand_duration,
timeline_manager::{Manager, StateSnapshot},
wal_backup,
wal_backup_partial::{self, PartialRemoteSegment},
Expand Down Expand Up @@ -50,7 +50,6 @@ impl Manager {
.flush_lsn
.segment_number(self.wal_seg_size)
== self.last_removed_segno + 1
&& self.resident_since.elapsed() >= self.conf.eviction_min_resident
}

/// Evict the timeline to remote storage.
Expand Down Expand Up @@ -112,7 +111,8 @@ impl Manager {
return;
}

self.resident_since = Instant::now();
self.evict_not_before =
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);

info!("successfully restored evicted timeline");
}
Expand Down
48 changes: 35 additions & 13 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use utils::lsn::Lsn;
use crate::{
control_file::{FileStorage, Storage},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
rate_limit::{rand_duration, RateLimiter},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
safekeeper::Term,
Expand All @@ -32,7 +33,7 @@ use crate::{
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
wal_backup_partial::{self, PartialRemoteSegment},
SafeKeeperConf,
};

Expand Down Expand Up @@ -185,11 +186,11 @@ pub(crate) struct Manager {

// misc
pub(crate) access_service: AccessService,
pub(crate) partial_backup_rate_limiter: RateLimiter,
pub(crate) global_rate_limiter: RateLimiter,

// Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
// evict them if they go inactive very soon after being restored.
pub(crate) resident_since: std::time::Instant,
pub(crate) evict_not_before: Instant,
}

/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
Expand All @@ -202,7 +203,7 @@ pub async fn main_task(
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
global_rate_limiter: RateLimiter,
) {
tli.set_status(Status::Started);

Expand All @@ -220,7 +221,7 @@ pub async fn main_task(
conf,
broker_active_set,
manager_tx,
partial_backup_rate_limiter,
global_rate_limiter,
)
.await;

Expand Down Expand Up @@ -254,9 +255,29 @@ pub async fn main_task(
mgr.set_status(Status::UpdatePartialBackup);
mgr.update_partial_backup(&state_snapshot).await;

if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_event, &state_snapshot) {
mgr.set_status(Status::EvictTimeline);
mgr.evict_timeline().await;
let now = Instant::now();
if mgr.evict_not_before > now {
// we should wait until evict_not_before
update_next_event(&mut next_event, mgr.evict_not_before);
}

if mgr.conf.enable_offload
&& mgr.evict_not_before <= now
&& mgr.ready_for_eviction(&next_event, &state_snapshot)
{
// check rate limiter and evict timeline if possible
match mgr.global_rate_limiter.try_acquire_eviction() {
Some(_permit) => {
mgr.set_status(Status::EvictTimeline);
mgr.evict_timeline().await;
}
None => {
// we can't evict timeline now, will try again later
mgr.evict_not_before =
Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
update_next_event(&mut next_event, mgr.evict_not_before);
}
}
}
}

Expand Down Expand Up @@ -334,11 +355,10 @@ impl Manager {
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
global_rate_limiter: RateLimiter,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
conf,
wal_seg_size: tli.get_wal_seg_size().await,
walsenders: tli.get_walsenders().clone(),
state_version_rx: tli.get_state_version_rx(),
Expand All @@ -353,8 +373,10 @@ impl Manager {
partial_backup_uploaded,
access_service: AccessService::new(manager_tx),
tli,
partial_backup_rate_limiter,
resident_since: std::time::Instant::now(),
global_rate_limiter,
// to smooth out evictions spike after restart
evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
conf,
}
}

Expand Down Expand Up @@ -541,7 +563,7 @@ impl Manager {
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
self.wal_resident_timeline(),
self.conf.clone(),
self.partial_backup_rate_limiter.clone(),
self.global_rate_limiter.clone(),
)));
}

Expand Down
14 changes: 9 additions & 5 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
//! All timelines should always be present in this map, this is done by loading them
//! all from the disk on startup and keeping them in memory.
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::ServerInfo;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup_partial::RateLimiter;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
Expand All @@ -31,7 +32,7 @@ struct GlobalTimelinesState {
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
partial_backup_rate_limiter: RateLimiter,
global_rate_limiter: RateLimiter,
}

// Used to prevent concurrent timeline loading.
Expand All @@ -50,7 +51,7 @@ impl GlobalTimelinesState {
(
self.get_conf().clone(),
self.broker_active_set.clone(),
self.partial_backup_rate_limiter.clone(),
self.global_rate_limiter.clone(),
)
}

Expand Down Expand Up @@ -85,7 +86,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
partial_backup_rate_limiter: RateLimiter::new(1),
global_rate_limiter: RateLimiter::new(1, 1),
})
});

Expand All @@ -99,7 +100,10 @@ impl GlobalTimelines {
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency);
state.global_rate_limiter = RateLimiter::new(
conf.partial_backup_concurrency,
DEFAULT_EVICTION_CONCURRENCY,
);
state.conf = Some(conf);

// Iterate through all directories and load tenants for all directories
Expand Down
39 changes: 12 additions & 27 deletions safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use std::sync::Arc;

use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
Expand All @@ -30,37 +28,14 @@ use utils::lsn::Lsn;

use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
rate_limit::{rand_duration, RateLimiter},
safekeeper::Term,
timeline::WalResidentTimeline,
timeline_manager::StateSnapshot,
wal_backup::{self, remote_timeline_path},
SafeKeeperConf,
};

#[derive(Clone)]
pub struct RateLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
}

impl RateLimiter {
pub fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
}
}

async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_permit_acquire"])
.start_timer();
self.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore is closed")
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum UploadStatus {
/// Upload is in progress. This status should be used only for garbage collection,
Expand Down Expand Up @@ -352,6 +327,7 @@ pub async fn main_task(
) -> Option<PartialRemoteSegment> {
debug!("started");
let await_duration = conf.partial_backup_timeout;
let mut first_iteration = true;

let (_, persistent_state) = tli.get_state().await;
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
Expand Down Expand Up @@ -419,6 +395,15 @@ pub async fn main_task(
}
}

// smoothing the load after restart, by sleeping for a random time.
// if this is not the first iteration, we will wait for the full await_duration
let await_duration = if first_iteration {
first_iteration = false;
rand_duration(&await_duration)
} else {
await_duration
};

// fixing the segno and waiting some time to prevent reuploading the same segment too often
let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
let timeout = tokio::time::sleep(await_duration);
Expand Down Expand Up @@ -454,7 +439,7 @@ pub async fn main_task(
}

// limit concurrent uploads
let _upload_permit = limiter.acquire_owned().await;
let _upload_permit = limiter.acquire_partial_backup().await;

let prepared = backup.prepare_upload().await;
if let Some(seg) = &uploaded_segment {
Expand Down

1 comment on commit f3acfb2

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2192 tests run: 2109 passed, 4 failed, 79 skipped (full report)


Failures on Postgres 14

  • test_gc_feedback_with_snapshots[github-actions-selfhosted]: release
  • test_gc_feedback[github-actions-selfhosted]: release
  • test_heavy_write_workload[neon_on-github-actions-selfhosted-10-5-5]: release
  • test_storage_controller_many_tenants[github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_gc_feedback_with_snapshots[release-pg14-github-actions-selfhosted] or test_gc_feedback[release-pg14-github-actions-selfhosted] or test_heavy_write_workload[neon_on-release-pg14-github-actions-selfhosted-10-5-5] or test_storage_controller_many_tenants[release-pg14-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 32.8% (7125 of 21694 functions)
  • lines: 50.4% (57369 of 113806 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
f3acfb2 at 2024-08-02T18:17:57.871Z :recycle:

Please sign in to comment.