Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI run for PR #10179 #10183

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion safekeeper/benches/benchutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeepe
use safekeeper::state::{TimelinePersistentState, TimelineState};
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use safekeeper::timelines_set::TimelinesSet;
use safekeeper::wal_backup::remote_timeline_path;
use safekeeper::wal_backup::{remote_timeline_path, WalBackup};
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
Expand Down Expand Up @@ -90,12 +90,15 @@ impl Env {
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));

let wal_backup = Arc::new(WalBackup::default());

let timeline = Timeline::new(
ttid,
&timeline_dir,
&remote_path,
shared_state,
conf.clone(),
wal_backup,
);
timeline.bootstrap(
&mut timeline.write_shared_state().await,
Expand Down
9 changes: 5 additions & 4 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use remote_storage::RemoteStorageConfig;
use safekeeper::wal_backup::WalBackup;
use sd_notify::NotifyState;
use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
Expand Down Expand Up @@ -35,9 +36,9 @@ use safekeeper::http;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::HTTP_RUNTIME;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
use utils::{
Expand Down Expand Up @@ -428,15 +429,15 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
e
})?;

let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
let wal_backup = Arc::new(WalBackup::new().init(&conf).await?);

let global_timelines = Arc::new(GlobalTimelines::new(conf.clone(), wal_backup));

// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
metrics::register_internal(Box::new(timeline_collector))?;

wal_backup::init_remote_storage(&conf).await;

// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();
Expand Down
19 changes: 10 additions & 9 deletions safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{
state::TimelinePersistentState,
timeline::{TimelineError, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines,
};
Expand Down Expand Up @@ -127,14 +126,16 @@ pub async fn handle_request(
assert!(first_ondisk_segment <= last_segment);
assert!(first_ondisk_segment >= first_segment);

copy_s3_segments(
wal_seg_size,
&request.source_ttid,
&request.destination_ttid,
first_segment,
first_ondisk_segment,
)
.await?;
source_tli
.wal_backup
.copy_s3_segments(
wal_seg_size,
&request.source_ttid,
&request.destination_ttid,
first_segment,
first_ondisk_segment,
)
.await?;

copy_disk_segments(
&source_tli,
Expand Down
23 changes: 12 additions & 11 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
state::{EvictionState, TimelinePersistentState},
timeline::{Timeline, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup,
wal_storage::open_wal_file,
GlobalTimelines,
};
Expand Down Expand Up @@ -218,11 +217,12 @@ impl Timeline {
// Optimistically try to copy the partial segment to the destination's path: this
// can fail if the timeline was un-evicted and modified in the background.
let remote_timeline_path = &self.remote_path;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
.await?;
self.wal_backup
.copy_partial_segment(
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
.await?;

// Since the S3 copy succeeded with the path given in our control file snapshot, and
// we are sending that snapshot in our response, we are giving the caller a consistent
Expand Down Expand Up @@ -285,11 +285,12 @@ impl WalResidentTimeline {
);

let remote_timeline_path = &self.tli.remote_path;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
.await?;
self.wal_backup
.copy_partial_segment(
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
.await?;
}

let buf = control_store
Expand Down
11 changes: 9 additions & 2 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, Tim
use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup::{remote_timeline_path, WalBackup};
use crate::wal_backup_partial::PartialRemoteSegment;

use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
Expand Down Expand Up @@ -444,6 +444,8 @@ pub struct Timeline {
manager_ctl: ManagerCtl,
conf: Arc<SafeKeeperConf>,

pub(crate) wal_backup: Arc<WalBackup>,

/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
/// this gate, you must respect [`Timeline::cancel`]
pub(crate) gate: Gate,
Expand All @@ -466,6 +468,7 @@ impl Timeline {
remote_path: &RemotePath,
shared_state: SharedState,
conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state().commit_lsn);
Expand Down Expand Up @@ -498,13 +501,15 @@ impl Timeline {
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
wal_backup,
})
}

/// Load existing timeline from disk.
pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();

Expand All @@ -518,6 +523,7 @@ impl Timeline {
&remote_path,
shared_state,
conf,
wal_backup,
))
}

Expand Down Expand Up @@ -590,7 +596,7 @@ impl Timeline {
// Note: we concurrently delete remote storage data from multiple
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
// do some retries anyway.
wal_backup::delete_timeline(&self.ttid).await?;
self.wal_backup.delete_timeline(&self.ttid).await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok(dir_existed)
Expand Down Expand Up @@ -940,6 +946,7 @@ impl WalResidentTimeline {
&persisted_state,
start_lsn,
enable_remote_read,
self.wal_backup.clone(),
)
}

Expand Down
5 changes: 2 additions & 3 deletions safekeeper/src/timeline_eviction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::{
},
rate_limit::rand_duration,
timeline_manager::{Manager, StateSnapshot},
wal_backup,
wal_backup_partial::{self, PartialRemoteSegment},
wal_storage::wal_file_paths,
};
Expand Down Expand Up @@ -195,7 +194,7 @@ async fn redownload_partial_segment(
remote_segfile, tmp_file
);

let mut reader = wal_backup::read_object(&remote_segfile, 0).await?;
let mut reader = mgr.tli.wal_backup.read_object(&remote_segfile, 0).await?;
let mut file = File::create(&tmp_file).await?;

let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
Expand Down Expand Up @@ -275,7 +274,7 @@ async fn do_validation(

let remote_segfile = remote_segment_path(mgr, partial);
let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
wal_backup::read_object(&remote_segfile, 0).await?;
mgr.tli.wal_backup.read_object(&remote_segfile, 0).await?;

// remote segment should have bytes excatly up to `flush_lsn`
let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
Expand Down
29 changes: 21 additions & 8 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::rate_limit::RateLimiter;
use crate::state::TimelinePersistentState;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::WalBackup;
use crate::wal_storage::Storage;
use crate::{control_file, wal_storage, SafeKeeperConf};
use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -44,15 +45,24 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
}

impl GlobalTimelinesState {
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
fn get_dependencies(
&self,
) -> (
Arc<SafeKeeperConf>,
Arc<TimelinesSet>,
RateLimiter,
Arc<WalBackup>,
) {
(
self.conf.clone(),
self.broker_active_set.clone(),
self.global_rate_limiter.clone(),
self.wal_backup.clone(),
)
}

Expand Down Expand Up @@ -81,14 +91,15 @@ pub struct GlobalTimelines {

impl GlobalTimelines {
/// Create a new instance of the global timelines map.
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
wal_backup,
}),
}
}
Expand Down Expand Up @@ -144,7 +155,7 @@ impl GlobalTimelines {
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let state = self.state.lock().unwrap();
state.get_dependencies()
};
Expand All @@ -159,7 +170,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), ttid) {
match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
Expand Down Expand Up @@ -218,7 +229,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
Expand Down Expand Up @@ -264,7 +275,7 @@ impl GlobalTimelines {
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
Expand Down Expand Up @@ -293,7 +304,8 @@ impl GlobalTimelines {
};

// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone(), wal_backup).await
{
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = self.state.lock().unwrap();
Expand Down Expand Up @@ -333,6 +345,7 @@ impl GlobalTimelines {
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
Expand Down Expand Up @@ -374,7 +387,7 @@ impl GlobalTimelines {
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;

Timeline::load_timeline(conf, ttid)
Timeline::load_timeline(conf, ttid, wal_backup)
}

/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
Expand Down
Loading
Loading