diff --git a/Cargo.lock b/Cargo.lock index d1f77469699e..73aa216fe1a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5565,10 +5565,12 @@ dependencies = [ name = "safekeeper_api" version = "0.1.0" dependencies = [ + "anyhow", "const_format", "postgres_ffi", "pq_proto", "serde", + "serde_json", "tokio", "utils", ] diff --git a/libs/safekeeper_api/Cargo.toml b/libs/safekeeper_api/Cargo.toml index 4234ec6779a2..7652c3d4137a 100644 --- a/libs/safekeeper_api/Cargo.toml +++ b/libs/safekeeper_api/Cargo.toml @@ -5,8 +5,10 @@ edition.workspace = true license.workspace = true [dependencies] +anyhow.workspace = true const_format.workspace = true serde.workspace = true +serde_json.workspace = true postgres_ffi.workspace = true pq_proto.workspace = true tokio.workspace = true diff --git a/libs/safekeeper_api/src/lib.rs b/libs/safekeeper_api/src/lib.rs index be6923aca902..fa86523ad7f5 100644 --- a/libs/safekeeper_api/src/lib.rs +++ b/libs/safekeeper_api/src/lib.rs @@ -4,12 +4,15 @@ use const_format::formatcp; use pq_proto::SystemId; use serde::{Deserialize, Serialize}; +pub mod membership; /// Public API types pub mod models; /// Consensus logical timestamp. Note: it is a part of sk control file. pub type Term = u64; -pub const INVALID_TERM: Term = 0; +/// With this term timeline is created initially. It +/// is a normal term except wp is never elected with it. +pub const INITIAL_TERM: Term = 0; /// Information about Postgres. Safekeeper gets it once and then verifies all /// further connections from computes match. Note: it is a part of sk control diff --git a/libs/safekeeper_api/src/membership.rs b/libs/safekeeper_api/src/membership.rs new file mode 100644 index 000000000000..fe302045451f --- /dev/null +++ b/libs/safekeeper_api/src/membership.rs @@ -0,0 +1,164 @@ +//! Types defining safekeeper membership, see +//! rfcs/035-safekeeper-dynamic-membership-change.md +//! for details. + +use std::{collections::HashSet, fmt::Display}; + +use anyhow; +use anyhow::bail; +use serde::{Deserialize, Serialize}; +use utils::id::NodeId; + +/// Number uniquely identifying safekeeper configuration. +/// Note: it is a part of sk control file. +pub type Generation = u32; +/// 1 is the first valid generation, 0 is used as +/// a placeholder before we fully migrate to generations. +pub const INVALID_GENERATION: Generation = 0; +pub const INITIAL_GENERATION: Generation = 1; + +/// Membership is defined by ids so e.g. walproposer uses them to figure out +/// quorums, but we also carry host and port to give wp idea where to connect. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SafekeeperId { + pub id: NodeId, + pub host: String, + pub pg_port: u16, +} + +impl Display for SafekeeperId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port) + } +} + +/// Set of safekeepers. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(transparent)] +pub struct MemberSet { + pub members: Vec, +} + +impl MemberSet { + pub fn empty() -> Self { + MemberSet { + members: Vec::new(), + } + } + + pub fn new(members: Vec) -> anyhow::Result { + let hs: HashSet = HashSet::from_iter(members.iter().map(|sk| sk.id)); + if hs.len() != members.len() { + bail!("duplicate safekeeper id in the set {:?}", members); + } + Ok(MemberSet { members }) + } + + pub fn contains(&self, sk: &SafekeeperId) -> bool { + self.members.iter().any(|m| m.id == sk.id) + } + + pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> { + if self.contains(&sk) { + bail!(format!( + "sk {} is already member of the set {}", + sk.id, self + )); + } + self.members.push(sk); + Ok(()) + } +} + +impl Display for MemberSet { + /// Display as a comma separated list of members. + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let sks_str = self + .members + .iter() + .map(|m| m.to_string()) + .collect::>(); + write!(f, "({})", sks_str.join(", ")) + } +} + +/// Safekeeper membership configuration. +/// Note: it is a part of both control file and http API. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Configuration { + /// Unique id. + pub generation: Generation, + /// Current members of the configuration. + pub members: MemberSet, + /// Some means it is a joint conf. + pub new_members: Option, +} + +impl Configuration { + /// Used for pre-generations timelines, will be removed eventually. + pub fn empty() -> Self { + Configuration { + generation: INVALID_GENERATION, + members: MemberSet::empty(), + new_members: None, + } + } +} + +impl Display for Configuration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "gen={}, members={}, new_members={}", + self.generation, + self.members, + self.new_members + .as_ref() + .map(ToString::to_string) + .unwrap_or(String::from("none")) + ) + } +} + +#[cfg(test)] +mod tests { + use super::{MemberSet, SafekeeperId}; + use utils::id::NodeId; + + #[test] + fn test_member_set() { + let mut members = MemberSet::empty(); + members + .add(SafekeeperId { + id: NodeId(42), + host: String::from("lala.org"), + pg_port: 5432, + }) + .unwrap(); + + members + .add(SafekeeperId { + id: NodeId(42), + host: String::from("lala.org"), + pg_port: 5432, + }) + .expect_err("duplicate must not be allowed"); + + members + .add(SafekeeperId { + id: NodeId(43), + host: String::from("bubu.org"), + pg_port: 5432, + }) + .unwrap(); + + println!("members: {}", members); + + let j = serde_json::to_string(&members).expect("failed to serialize"); + println!("members json: {}", j); + assert_eq!( + j, + r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"# + ); + } +} diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 3e424a792c7f..ad38986357fb 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -11,7 +11,7 @@ use utils::{ pageserver_feedback::PageserverFeedback, }; -use crate::{ServerInfo, Term}; +use crate::{membership::Configuration, ServerInfo, Term}; #[derive(Debug, Serialize)] pub struct SafekeeperStatus { @@ -22,13 +22,16 @@ pub struct SafekeeperStatus { pub struct TimelineCreateRequest { pub tenant_id: TenantId, pub timeline_id: TimelineId, - pub peer_ids: Option>, + pub mconf: Configuration, pub pg_version: u32, pub system_id: Option, + // By default WAL_SEGMENT_SIZE pub wal_seg_size: Option, - pub commit_lsn: Lsn, - // If not passed, it is assigned to the beginning of commit_lsn segment. - pub local_start_lsn: Option, + pub start_lsn: Lsn, + // Normal creation should omit this field (start_lsn initializes all LSNs). + // However, we allow specifying custom value higher than start_lsn for + // manual recovery case, see test_s3_wal_replay. + pub commit_lsn: Option, } /// Same as TermLsn, but serializes LSN using display serializer diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 06e5afbf74ec..e92ca881e15e 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -3,6 +3,7 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use camino::{Utf8Path, Utf8PathBuf}; +use safekeeper_api::membership::INVALID_GENERATION; use tokio::fs::File; use tokio::io::AsyncWriteExt; use utils::crashsafe::durable_rename; @@ -13,14 +14,14 @@ use std::ops::Deref; use std::path::Path; use std::time::Instant; -use crate::control_file_upgrade::downgrade_v9_to_v8; +use crate::control_file_upgrade::downgrade_v10_to_v9; use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; use crate::state::{EvictionState, TimelinePersistentState}; use utils::bin_ser::LeSer; pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 9; +pub const SK_FORMAT_VERSION: u32 = 10; // contains persistent metadata for safekeeper pub const CONTROL_FILE_NAME: &str = "safekeeper.control"; @@ -169,10 +170,11 @@ impl TimelinePersistentState { let mut buf: Vec = Vec::new(); WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; - if self.eviction_state == EvictionState::Present { - // temp hack for forward compatibility - const PREV_FORMAT_VERSION: u32 = 8; - let prev = downgrade_v9_to_v8(self); + if self.mconf.generation == INVALID_GENERATION { + // Temp hack for forward compatibility test: in case of none + // configuration save cfile in previous v9 format. + const PREV_FORMAT_VERSION: u32 = 9; + let prev = downgrade_v10_to_v9(self); WriteBytesExt::write_u32::(&mut buf, PREV_FORMAT_VERSION)?; prev.ser_into(&mut buf)?; } else { @@ -233,6 +235,7 @@ impl Storage for FileStorage { #[cfg(test)] mod test { use super::*; + use safekeeper_api::membership::{Configuration, MemberSet}; use tokio::fs; use utils::lsn::Lsn; @@ -242,6 +245,11 @@ mod test { async fn test_read_write_safekeeper_state() -> anyhow::Result<()> { let tempdir = camino_tempfile::tempdir()?; let mut state = TimelinePersistentState::empty(); + state.mconf = Configuration { + generation: 42, + members: MemberSet::empty(), + new_members: None, + }; let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?; // Make a change. diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index dd152fd4cce8..904e79f976eb 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,17 +1,22 @@ //! Code to deal with safekeeper control file upgrades +use std::vec; + use crate::{ safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn}, - state::{EvictionState, PersistedPeers, TimelinePersistentState}, + state::{EvictionState, TimelinePersistentState}, wal_backup_partial, }; use anyhow::{bail, Result}; use pq_proto::SystemId; -use safekeeper_api::{ServerInfo, Term}; +use safekeeper_api::{ + membership::{Configuration, INVALID_GENERATION}, + ServerInfo, Term, +}; use serde::{Deserialize, Serialize}; use tracing::*; use utils::{ bin_ser::LeSer, - id::{TenantId, TimelineId}, + id::{NodeId, TenantId, TimelineId}, lsn::Lsn, }; @@ -233,6 +238,90 @@ pub struct SafeKeeperStateV8 { pub partial_backup: wal_backup_partial::State, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PersistedPeerInfo { + /// LSN up to which safekeeper offloaded WAL to s3. + pub backup_lsn: Lsn, + /// Term of the last entry. + pub term: Term, + /// LSN of the last record. + pub flush_lsn: Lsn, + /// Up to which LSN safekeeper regards its WAL as committed. + pub commit_lsn: Lsn, +} + +impl PersistedPeerInfo { + pub fn new() -> Self { + Self { + backup_lsn: Lsn::INVALID, + term: safekeeper_api::INITIAL_TERM, + flush_lsn: Lsn(0), + commit_lsn: Lsn(0), + } + } +} + +// make clippy happy +impl Default for PersistedPeerInfo { + fn default() -> Self { + Self::new() + } +} + +/// Note: SafekeeperStateVn is old name for TimelinePersistentStateVn. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TimelinePersistentStateV9 { + #[serde(with = "hex")] + pub tenant_id: TenantId, + #[serde(with = "hex")] + pub timeline_id: TimelineId, + /// persistent acceptor state + pub acceptor_state: AcceptorState, + /// information about server + pub server: ServerInfo, + /// Unique id of the last *elected* proposer we dealt with. Not needed + /// for correctness, exists for monitoring purposes. + #[serde(with = "hex")] + pub proposer_uuid: PgUuid, + /// Since which LSN this timeline generally starts. Safekeeper might have + /// joined later. + pub timeline_start_lsn: Lsn, + /// Since which LSN safekeeper has (had) WAL for this timeline. + /// All WAL segments next to one containing local_start_lsn are + /// filled with data from the beginning. + pub local_start_lsn: Lsn, + /// Part of WAL acknowledged by quorum *and available locally*. Always points + /// to record boundary. + pub commit_lsn: Lsn, + /// LSN that points to the end of the last backed up segment. Useful to + /// persist to avoid finding out offloading progress on boot. + pub backup_lsn: Lsn, + /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone). Persisting it helps skipping + /// recovery in walproposer, generally we compute it from peers. In + /// walproposer proto called 'truncate_lsn'. Updates are currently drived + /// only by walproposer. + pub peer_horizon_lsn: Lsn, + /// LSN of the oldest known checkpoint made by pageserver and successfully + /// pushed to s3. We don't remove WAL beyond it. Persisted only for + /// informational purposes, we receive it from pageserver (or broker). + pub remote_consistent_lsn: Lsn, + /// Peers and their state as we remember it. Knowing peers themselves is + /// fundamental; but state is saved here only for informational purposes and + /// obviously can be stale. (Currently not saved at all, but let's provision + /// place to have less file version upgrades). + pub peers: PersistedPeers, + /// Holds names of partial segments uploaded to remote storage. Used to + /// clean up old objects without leaving garbage in remote storage. + pub partial_backup: wal_backup_partial::State, + /// Eviction state of the timeline. If it's Offloaded, we should download + /// WAL files from remote storage to serve the timeline. + pub eviction_state: EvictionState, +} + pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { // migrate to storing full term history if version == 1 { @@ -248,6 +337,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result Result Result Result Result Result Result Result Result Result Result Result Result SafeKeeperStateV8 { - assert!(state.eviction_state == EvictionState::Present); - SafeKeeperStateV8 { +// Used as a temp hack to make forward compatibility test work. Should be +// removed after PR adding v10 is merged. +pub fn downgrade_v10_to_v9(state: &TimelinePersistentState) -> TimelinePersistentStateV9 { + assert!(state.mconf.generation == INVALID_GENERATION); + TimelinePersistentStateV9 { tenant_id: state.tenant_id, timeline_id: state.timeline_id, acceptor_state: state.acceptor_state.clone(), @@ -426,8 +542,9 @@ pub fn downgrade_v9_to_v8(state: &TimelinePersistentState) -> SafeKeeperStateV8 backup_lsn: state.backup_lsn, peer_horizon_lsn: state.peer_horizon_lsn, remote_consistent_lsn: state.remote_consistent_lsn, - peers: state.peers.clone(), + peers: PersistedPeers(vec![]), partial_backup: state.partial_backup.clone(), + eviction_state: state.eviction_state, } } @@ -437,7 +554,7 @@ mod tests { use utils::{id::NodeId, Hex}; - use crate::safekeeper::PersistedPeerInfo; + use crate::control_file_upgrade::PersistedPeerInfo; use super::*; diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 28ef2b1d23f1..10a761e1f51d 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -1,6 +1,7 @@ use anyhow::{bail, Result}; use camino::Utf8PathBuf; use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; +use safekeeper_api::membership::Configuration; use std::sync::Arc; use tokio::{ fs::OpenOptions, @@ -147,10 +148,10 @@ pub async fn handle_request( let mut new_state = TimelinePersistentState::new( &request.destination_ttid, + Configuration::empty(), state.server.clone(), - vec![], - request.until_lsn, start_lsn, + request.until_lsn, )?; new_state.timeline_start_lsn = start_lsn; new_state.peer_horizon_lsn = request.until_lsn; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 9bc1bf340919..69efdc378379 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -111,14 +111,15 @@ async fn timeline_create_handler(mut request: Request) -> Result NetworkReader<'_, IO> { }; let tli = self .global_timelines - .create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID) + .create( + self.ttid, + Configuration::empty(), + server_info, + Lsn::INVALID, + Lsn::INVALID, + ) .await .context("create timeline")?; tli.wal_residence_guard().await? diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 6ceaf325b049..06403228e952 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -7,7 +7,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; use safekeeper_api::models::HotStandbyFeedback; use safekeeper_api::Term; -use safekeeper_api::INVALID_TERM; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -193,36 +192,6 @@ impl AcceptorState { } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct PersistedPeerInfo { - /// LSN up to which safekeeper offloaded WAL to s3. - pub backup_lsn: Lsn, - /// Term of the last entry. - pub term: Term, - /// LSN of the last record. - pub flush_lsn: Lsn, - /// Up to which LSN safekeeper regards its WAL as committed. - pub commit_lsn: Lsn, -} - -impl PersistedPeerInfo { - pub fn new() -> Self { - Self { - backup_lsn: Lsn::INVALID, - term: INVALID_TERM, - flush_lsn: Lsn(0), - commit_lsn: Lsn(0), - } - } -} - -// make clippy happy -impl Default for PersistedPeerInfo { - fn default() -> Self { - Self::new() - } -} - // protocol messages /// Initial Proposer -> Acceptor message @@ -1010,7 +979,7 @@ where /// Update commit_lsn from peer safekeeper data. pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> { - if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) { + if Lsn(sk_info.commit_lsn) != Lsn::INVALID { // Note: the check is too restrictive, generally we can update local // commit_lsn if our history matches (is part of) history of advanced // commit_lsn provider. @@ -1025,12 +994,20 @@ where #[cfg(test)] mod tests { use futures::future::BoxFuture; + use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; - use safekeeper_api::ServerInfo; + use safekeeper_api::{ + membership::{Configuration, MemberSet, SafekeeperId}, + ServerInfo, + }; use super::*; - use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState}; - use std::{ops::Deref, str::FromStr, time::Instant}; + use crate::state::{EvictionState, TimelinePersistentState}; + use std::{ + ops::Deref, + str::FromStr, + time::{Instant, UNIX_EPOCH}, + }; // fake storage for tests struct InMemoryState { @@ -1313,12 +1290,21 @@ mod tests { #[test] fn test_sk_state_bincode_serde_roundtrip() { - use utils::Hex; let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); let state = TimelinePersistentState { tenant_id, timeline_id, + mconf: Configuration { + generation: 42, + members: MemberSet::new(vec![SafekeeperId { + id: NodeId(1), + host: "hehe.org".to_owned(), + pg_port: 5432, + }]) + .expect("duplicate member"), + new_members: None, + }, acceptor_state: AcceptorState { term: 42, term_history: TermHistory(vec![TermLsn { @@ -1342,70 +1328,13 @@ mod tests { backup_lsn: Lsn(1234567300), peer_horizon_lsn: Lsn(9999999), remote_consistent_lsn: Lsn(1234560000), - peers: PersistedPeers(vec![( - NodeId(1), - PersistedPeerInfo { - backup_lsn: Lsn(1234567000), - term: 42, - flush_lsn: Lsn(1234567800 - 8), - commit_lsn: Lsn(1234567600), - }, - )]), partial_backup: crate::wal_backup_partial::State::default(), eviction_state: EvictionState::Present, + creation_ts: UNIX_EPOCH, }; let ser = state.ser().unwrap(); - #[rustfmt::skip] - let expected = [ - // tenant_id as length prefixed hex - 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36, - // timeline_id as length prefixed hex - 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34, - // term - 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // length prefix - 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // unsure why this order is swapped - 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // pg_version - 0x0e, 0x00, 0x00, 0x00, - // systemid - 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12, - // wal_seg_size - 0x78, 0x56, 0x34, 0x12, - // pguuid as length prefixed hex - 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31, - - // timeline_start_lsn - 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00, - 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, - 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, - 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00, - // length prefix for persistentpeers - 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // nodeid - 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // backuplsn - 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00, - 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, - 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, - // partial_backup - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // eviction_state - 0x00, 0x00, 0x00, 0x00, - ]; - - assert_eq!(Hex(&ser), Hex(&expected)); - let deser = TimelinePersistentState::des(&ser).unwrap(); assert_eq!(deser, state); diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index c6ae6c1d2b0e..1c3bb1b4dc0a 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -1,20 +1,22 @@ //! Defines per timeline data stored persistently (SafeKeeperPersistentState) //! and its wrapper with in memory layer (SafekeeperState). -use std::{cmp::max, ops::Deref}; +use std::{cmp::max, ops::Deref, time::SystemTime}; use anyhow::{bail, Result}; use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::{models::TimelineTermBumpResponse, ServerInfo, Term}; +use safekeeper_api::{ + membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term, INITIAL_TERM, +}; use serde::{Deserialize, Serialize}; use utils::{ - id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, }; use crate::{ control_file, - safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, TermHistory, UNKNOWN_SERVER_VERSION}, + safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION}, timeline::TimelineError, wal_backup_partial::{self}, }; @@ -27,6 +29,8 @@ pub struct TimelinePersistentState { pub tenant_id: TenantId, #[serde(with = "hex")] pub timeline_id: TimelineId, + /// Membership configuration. + pub mconf: Configuration, /// persistent acceptor state pub acceptor_state: AcceptorState, /// information about server @@ -58,22 +62,15 @@ pub struct TimelinePersistentState { /// pushed to s3. We don't remove WAL beyond it. Persisted only for /// informational purposes, we receive it from pageserver (or broker). pub remote_consistent_lsn: Lsn, - /// Peers and their state as we remember it. Knowing peers themselves is - /// fundamental; but state is saved here only for informational purposes and - /// obviously can be stale. (Currently not saved at all, but let's provision - /// place to have less file version upgrades). - pub peers: PersistedPeers, /// Holds names of partial segments uploaded to remote storage. Used to /// clean up old objects without leaving garbage in remote storage. pub partial_backup: wal_backup_partial::State, /// Eviction state of the timeline. If it's Offloaded, we should download /// WAL files from remote storage to serve the timeline. pub eviction_state: EvictionState, + pub creation_ts: SystemTime, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); - /// State of the local WAL files. Used to track current timeline state, /// that can be either WAL files are present on disk or last partial segment /// is offloaded to remote storage. @@ -87,12 +84,14 @@ pub enum EvictionState { } impl TimelinePersistentState { + /// commit_lsn is the same as start_lsn in the normal creaiton; see + /// `TimelineCreateRequest` comments.` pub fn new( ttid: &TenantTimelineId, + mconf: Configuration, server_info: ServerInfo, - peers: Vec, + start_lsn: Lsn, commit_lsn: Lsn, - local_start_lsn: Lsn, ) -> anyhow::Result { if server_info.wal_seg_size == 0 { bail!(TimelineError::UninitializedWalSegSize(*ttid)); @@ -102,49 +101,59 @@ impl TimelinePersistentState { bail!(TimelineError::UninitialinzedPgVersion(*ttid)); } - if commit_lsn < local_start_lsn { + if commit_lsn < start_lsn { bail!( - "commit_lsn {} is smaller than local_start_lsn {}", + "commit_lsn {} is smaller than start_lsn {}", commit_lsn, - local_start_lsn + start_lsn ); } + // If we are given with init LSN, initialize term history with it. It + // ensures that walproposer always must be able to find a common point + // in histories; if it can't something is corrupted. Not having LSN here + // is so far left for legacy case where timeline is created by compute + // and LSN during creation is not known yet. + let term_history = if commit_lsn != Lsn::INVALID { + TermHistory(vec![TermLsn { + term: INITIAL_TERM, + lsn: start_lsn, + }]) + } else { + TermHistory::empty() + }; + Ok(TimelinePersistentState { tenant_id: ttid.tenant_id, timeline_id: ttid.timeline_id, + mconf, acceptor_state: AcceptorState { - term: 0, - term_history: TermHistory::empty(), + term: INITIAL_TERM, + term_history, }, server: server_info, proposer_uuid: [0; 16], - timeline_start_lsn: Lsn(0), - local_start_lsn, + timeline_start_lsn: start_lsn, + local_start_lsn: start_lsn, commit_lsn, - backup_lsn: local_start_lsn, - peer_horizon_lsn: local_start_lsn, + backup_lsn: start_lsn, + peer_horizon_lsn: start_lsn, remote_consistent_lsn: Lsn(0), - peers: PersistedPeers( - peers - .iter() - .map(|p| (*p, PersistedPeerInfo::new())) - .collect(), - ), partial_backup: wal_backup_partial::State::default(), eviction_state: EvictionState::Present, + creation_ts: SystemTime::now(), }) } pub fn empty() -> Self { TimelinePersistentState::new( &TenantTimelineId::empty(), + Configuration::empty(), ServerInfo { pg_version: 170000, /* Postgres server version (major * 10000) */ system_id: 0, /* Postgres system identifier */ wal_seg_size: WAL_SEGMENT_SIZE as u32, }, - vec![], Lsn::INVALID, Lsn::INVALID, ) diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index ad29c9f66c2c..a701534f65b4 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -12,6 +12,7 @@ use crate::{control_file, wal_storage, SafeKeeperConf}; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; +use safekeeper_api::membership::Configuration; use safekeeper_api::ServerInfo; use serde::Serialize; use std::collections::HashMap; @@ -214,9 +215,10 @@ impl GlobalTimelines { pub(crate) async fn create( &self, ttid: TenantTimelineId, + mconf: Configuration, server_info: ServerInfo, + start_lsn: Lsn, commit_lsn: Lsn, - local_start_lsn: Lsn, ) -> Result> { let (conf, _, _) = { let state = self.state.lock().unwrap(); @@ -239,8 +241,7 @@ impl GlobalTimelines { // TODO: currently we create only cfile. It would be reasonable to // immediately initialize first WAL segment as well. - let state = - TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?; + let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?; control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?; let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?; Ok(timeline) diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index efcdd89e7da7..a99de71a041b 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -21,7 +21,7 @@ use safekeeper::{ wal_storage::Storage, SafeKeeperConf, }; -use safekeeper_api::ServerInfo; +use safekeeper_api::{membership::Configuration, ServerInfo}; use tracing::{debug, info_span, warn}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -96,8 +96,13 @@ impl GlobalMap { let commit_lsn = Lsn::INVALID; let local_start_lsn = Lsn::INVALID; - let state = - TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?; + let state = TimelinePersistentState::new( + &ttid, + Configuration::empty(), + server_info, + commit_lsn, + local_start_lsn, + )?; let disk_timeline = self.disk.put_state(&ttid, state); let control_store = DiskStateStorage::new(disk_timeline.clone()); diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index eabdeb10539c..0e9f1fadda4f 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -15,7 +15,6 @@ from urllib3.util.retry import Retry from fixtures.common_types import ( - Id, Lsn, TenantId, TenantShardId, @@ -25,7 +24,7 @@ from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.pg_version import PgVersion -from fixtures.utils import Fn +from fixtures.utils import EnhancedJSONEncoder, Fn class PageserverApiException(Exception): @@ -83,14 +82,6 @@ class TimelineCreateRequest: mode: TimelineCreateRequestMode def to_json(self) -> str: - class EnhancedJSONEncoder(json.JSONEncoder): - def default(self, o): - if dataclasses.is_dataclass(o) and not isinstance(o, type): - return dataclasses.asdict(o) - elif isinstance(o, Id): - return o.id.hex() - return super().default(o) - # mode is flattened this = dataclasses.asdict(self) mode = this.pop("mode") diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 286f80ba69f1..4826cae3ee13 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -10,7 +10,7 @@ from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics -from fixtures.utils import wait_until +from fixtures.utils import EnhancedJSONEncoder, wait_until if TYPE_CHECKING: from typing import Any @@ -69,6 +69,34 @@ def from_json(cls, d: dict[str, Any]) -> TermBumpResponse: ) +@dataclass +class SafekeeperId: + id: int + host: str + pg_port: str + + +@dataclass +class Configuration: + generation: int + members: list[SafekeeperId] + new_members: list[SafekeeperId] | None + + +@dataclass +class TimelineCreateRequest: + tenant_id: TenantId + timeline_id: TimelineId + mconf: Configuration + # not exactly PgVersion, for example 150002 for 15.2 + pg_version: int + start_lsn: Lsn + commit_lsn: Lsn | None + + def to_json(self) -> str: + return json.dumps(self, cls=EnhancedJSONEncoder) + + class SafekeeperHttpClient(requests.Session, MetricsGetter): HTTPError = requests.HTTPError @@ -131,20 +159,8 @@ def timeline_list(self) -> list[TenantTimelineId]: resj = res.json() return [TenantTimelineId.from_json(ttidj) for ttidj in resj] - def timeline_create( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2 - commit_lsn: Lsn, - ): - body = { - "tenant_id": str(tenant_id), - "timeline_id": str(timeline_id), - "pg_version": pg_version, - "commit_lsn": str(commit_lsn), - } - res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body) + def timeline_create(self, r: TimelineCreateRequest): + res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", data=r.to_json()) res.raise_for_status() def timeline_status( diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index c34ac298d1cc..e160c617cdda 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import dataclasses import json import os import re @@ -21,6 +22,7 @@ from psycopg2.extensions import cursor from typing_extensions import override +from fixtures.common_types import Id, Lsn from fixtures.log_helper import log from fixtures.pageserver.common_types import ( parse_delta_layer, @@ -605,6 +607,22 @@ def join(self, timeout: float | None = None) -> Any: return self.ret +class EnhancedJSONEncoder(json.JSONEncoder): + """ + Default json.JSONEncoder works only on primitive builtins. Extend it to any + dataclass plus our custom types. + """ + + def default(self, o): + if dataclasses.is_dataclass(o) and not isinstance(o, type): + return dataclasses.asdict(o) + elif isinstance(o, Id): + return o.id.hex() + elif isinstance(o, Lsn): + return str(o) # standard hex notation + return super().default(o) + + def human_bytes(amt: float) -> str: """ Render a bytes amount into nice IEC bytes string. diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 0a8900b351e4..d39c6a6b5bef 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -48,7 +48,7 @@ default_remote_storage, s3_storage, ) -from fixtures.safekeeper.http import SafekeeperHttpClient +from fixtures.safekeeper.http import Configuration, SafekeeperHttpClient, TimelineCreateRequest from fixtures.safekeeper.utils import wait_walreceivers_absent from fixtures.utils import ( PropagatingThread, @@ -658,7 +658,13 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): for sk in env.safekeepers: sk.start() cli = sk.http_client() - cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) + mconf = Configuration(generation=0, members=[], new_members=None) + # set start_lsn to the beginning of the first segment to allow reading + # WAL from there (could you intidb LSN as well). + r = TimelineCreateRequest( + tenant_id, timeline_id, mconf, pg_version, Lsn("0/1000000"), commit_lsn=last_lsn + ) + cli.timeline_create(r) f_partial_path = ( Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name )