Skip to content

Commit

Permalink
safekeeper: use arc for global timelines and config (#10051)
Browse files Browse the repository at this point in the history
Hello! I was interested in potentially making some contributions to Neon
and looking through the issue backlog I found
[8200](#8200) which seemed
like a good first issue to attempt to tackle. I see it was assigned a
while ago so apologies if I'm stepping on any toes with this PR. I also
apologize for the size of this PR. I'm not sure if there is a simple way
to reduce it given the footprint of the components being changed.

## Problem
This PR is attempting to address part of the problem outlined in issue
[8200](#8200). Namely to
remove global static usage of timeline state in favour of
`Arc<GlobalTimelines>` and to replace wasteful clones of
`SafeKeeperConf` with `Arc<SafeKeeperConf>`. I did not opt to tackle
`RemoteStorage` in this PR to minimize the amount of changes as this PR
is already quite large. I also did not opt to introduce an
`SafekeeperApp` wrapper struct to similarly minimize changes but I can
tackle either or both of these omissions in this PR if folks would like.

## Summary of changes
- Remove static usage of `GlobalTimelines` in favour of
`Arc<GlobalTimelines>`
- Wrap `SafeKeeperConf` in `Arc` to avoid wasteful clones of the
underlying struct

## Some additional thoughts
- We seem to currently store `SafeKeeperConf` in `GlobalTimelines` and
then expose it through a public`get_global_config` function which
requires locking. This seems needlessly wasteful and based on observed
usage we could remove this public accessor and force consumers to
acquire `SafeKeeperConf` through the new Arc reference.
  • Loading branch information
oldmanfleming authored Dec 9, 2024
1 parent 4c4cb80 commit b593e51
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 193 deletions.
10 changes: 8 additions & 2 deletions safekeeper/benches/benchutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,20 @@ impl Env {
node_id: NodeId,
ttid: TenantTimelineId,
) -> anyhow::Result<Arc<Timeline>> {
let conf = self.make_conf(node_id);
let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;

let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));

let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state);
let timeline = Timeline::new(
ttid,
&timeline_dir,
&remote_path,
shared_state,
conf.clone(),
);
timeline.bootstrap(
&mut timeline.write_shared_state().await,
&conf,
Expand Down
36 changes: 22 additions & 14 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async fn main() -> anyhow::Result<()> {
}
};

let conf = SafeKeeperConf {
let conf = Arc::new(SafeKeeperConf {
workdir,
my_id: id,
listen_pg_addr: args.listen_pg,
Expand Down Expand Up @@ -368,7 +368,7 @@ async fn main() -> anyhow::Result<()> {
control_file_save_interval: args.control_file_save_interval,
partial_backup_concurrency: args.partial_backup_concurrency,
eviction_min_resident: args.eviction_min_resident,
};
});

// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(
Expand All @@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> {
/// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;

async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
// fsync the datadir to make sure we have a consistent state on disk.
if !conf.no_sync {
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
Expand Down Expand Up @@ -428,9 +428,11 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
e
})?;

let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));

// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
metrics::register_internal(Box::new(timeline_collector))?;

wal_backup::init_remote_storage(&conf).await;
Expand All @@ -447,9 +449,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.then(|| Handle::try_current().expect("no runtime in main"));

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone()).await?;
global_timelines.init().await?;

let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
Expand All @@ -459,55 +460,62 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf_,
conf.clone(),
pg_listener,
Scope::SafekeeperData,
global_timelines.clone(),
))
// wrap with task name for error reporting
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));

let global_timelines_ = global_timelines.clone();
let timeline_housekeeping_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(async move {
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
loop {
tokio::time::sleep(TOMBSTONE_TTL).await;
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
global_timelines_.housekeeping(&TOMBSTONE_TTL);
}
})
.map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle));

if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf_,
conf.clone(),
pg_listener_tenant_only,
Scope::Tenant,
global_timelines.clone(),
))
// wrap with task name for error reporting
.map(|res| ("WAL service tenant only main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
}

let conf_ = conf.clone();
let http_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| HTTP_RUNTIME.handle())
.spawn(http::task_main(conf_, http_listener))
.spawn(http::task_main(
conf.clone(),
http_listener,
global_timelines.clone(),
))
.map(|res| ("HTTP service main".to_owned(), res));
tasks_handles.push(Box::pin(http_handle));

let conf_ = conf.clone();
let broker_task_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| BROKER_RUNTIME.handle())
.spawn(broker::task_main(conf_).instrument(info_span!("broker")))
.spawn(
broker::task_main(conf.clone(), global_timelines.clone())
.instrument(info_span!("broker")),
)
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));

Expand Down
37 changes: 26 additions & 11 deletions safekeeper/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;

/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
async fn push_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
if conf.disable_periodic_broker_push {
info!("broker push_loop is disabled, doing nothing...");
futures::future::pending::<()>().await; // sleep forever
return Ok(());
}

let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
let active_timelines_set = global_timelines.get_global_broker_active_set();

let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
Expand Down Expand Up @@ -87,8 +90,13 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {

/// Subscribe and fetch all the interesting data from the broker.
#[instrument(name = "broker_pull", skip_all)]
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
async fn pull_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>,
) -> Result<()> {
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;

// TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest {
Expand All @@ -113,7 +121,7 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()>
.as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = GlobalTimelines::get(ttid) {
if let Ok(tli) = global_timelines.get(ttid) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
Expand All @@ -135,7 +143,11 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()>

/// Process incoming discover requests. This is done in a separate task to avoid
/// interfering with the normal pull/push loops.
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
async fn discover_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>,
) -> Result<()> {
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;

Expand Down Expand Up @@ -171,7 +183,7 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
.as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = GlobalTimelines::get(ttid) {
if let Ok(tli) = global_timelines.get(ttid) {
// we received a discovery request for a timeline we know about
discover_counter.inc();

Expand Down Expand Up @@ -210,7 +222,10 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
bail!("end of stream");
}

pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
info!("started, broker endpoint {:?}", conf.broker_endpoint);

let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
Expand Down Expand Up @@ -261,13 +276,13 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
},
_ = ticker.tick() => {
if push_handle.is_none() {
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
}
if pull_handle.is_none() {
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
}
if discover_handle.is_none() {
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
}
},
_ = &mut stats_task => {}
Expand Down
26 changes: 15 additions & 11 deletions safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::sync::Arc;

use anyhow::{bail, Result};
use camino::Utf8PathBuf;

use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use std::sync::Arc;
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt},
Expand All @@ -14,7 +12,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
control_file::FileStorage,
state::TimelinePersistentState,
timeline::{Timeline, TimelineError, WalResidentTimeline},
timeline::{TimelineError, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
Expand All @@ -25,16 +23,19 @@ use crate::{
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;

pub struct Request {
pub source: Arc<Timeline>,
pub source_ttid: TenantTimelineId,
pub until_lsn: Lsn,
pub destination_ttid: TenantTimelineId,
}

pub async fn handle_request(request: Request) -> Result<()> {
pub async fn handle_request(
request: Request,
global_timelines: Arc<GlobalTimelines>,
) -> Result<()> {
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state

match GlobalTimelines::get(request.destination_ttid) {
match global_timelines.get(request.destination_ttid) {
// timeline already exists. would be good to check that this timeline is the copy
// of the source timeline, but it isn't obvious how to do that
Ok(_) => return Ok(()),
Expand All @@ -46,9 +47,10 @@ pub async fn handle_request(request: Request) -> Result<()> {
}
}

let source_tli = request.source.wal_residence_guard().await?;
let source = global_timelines.get(request.source_ttid)?;
let source_tli = source.wal_residence_guard().await?;

let conf = &GlobalTimelines::get_global_config();
let conf = &global_timelines.get_global_config();
let ttid = request.destination_ttid;

let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
Expand Down Expand Up @@ -127,7 +129,7 @@ pub async fn handle_request(request: Request) -> Result<()> {

copy_s3_segments(
wal_seg_size,
&request.source.ttid,
&request.source_ttid,
&request.destination_ttid,
first_segment,
first_ondisk_segment,
Expand Down Expand Up @@ -158,7 +160,9 @@ pub async fn handle_request(request: Request) -> Result<()> {

// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
global_timelines
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
.await?;

Ok(())
}
Expand Down
18 changes: 9 additions & 9 deletions safekeeper/src/debug_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,23 @@ pub struct FileInfo {
}

/// Build debug dump response, using the provided [`Args`] filters.
pub async fn build(args: Args) -> Result<Response> {
pub async fn build(args: Args, global_timelines: Arc<GlobalTimelines>) -> Result<Response> {
let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count();
let config = GlobalTimelines::get_global_config();
let timelines_count = global_timelines.timelines_count();
let config = global_timelines.get_global_config();

let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
// If both tenant_id and timeline_id are specified, we can just get the
// timeline directly, without taking a snapshot of the whole list.
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
if let Ok(tli) = GlobalTimelines::get(ttid) {
if let Ok(tli) = global_timelines.get(ttid) {
vec![tli]
} else {
vec![]
}
} else {
// Otherwise, take a snapshot of the whole list.
GlobalTimelines::get_all()
global_timelines.get_all()
};

let mut timelines = Vec::new();
Expand Down Expand Up @@ -344,12 +344,12 @@ fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {

/// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed.
fn build_config(config: SafeKeeperConf) -> Config {
fn build_config(config: Arc<SafeKeeperConf>) -> Config {
Config {
id: config.my_id,
workdir: config.workdir.into(),
listen_pg_addr: config.listen_pg_addr,
listen_http_addr: config.listen_http_addr,
workdir: config.workdir.clone().into(),
listen_pg_addr: config.listen_pg_addr.clone(),
listen_http_addr: config.listen_http_addr.clone(),
no_sync: config.no_sync,
max_offloader_lag_bytes: config.max_offloader_lag_bytes,
wal_backup_enabled: config.wal_backup_enabled,
Expand Down
Loading

1 comment on commit b593e51

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7201 tests run: 6876 passed, 0 failed, 325 skipped (full report)


Flaky tests (7)

Postgres 16

Postgres 15

Postgres 14

Code coverage* (full report)

  • functions: 31.4% (8335 of 26527 functions)
  • lines: 47.7% (65618 of 137521 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
b593e51 at 2024-12-09T23:22:15.695Z :recycle:

Please sign in to comment.