Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sk membership in cf #10195

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions libs/safekeeper_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ edition.workspace = true
license.workspace = true

[dependencies]
anyhow.workspace = true
const_format.workspace = true
serde.workspace = true
serde_json.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
tokio.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion libs/safekeeper_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use const_format::formatcp;
use pq_proto::SystemId;
use serde::{Deserialize, Serialize};

pub mod membership;
/// Public API types
pub mod models;

/// Consensus logical timestamp. Note: it is a part of sk control file.
pub type Term = u64;
pub const INVALID_TERM: Term = 0;
/// With this term timeline is created initially. It
/// is a normal term except wp is never elected with it.
pub const INITIAL_TERM: Term = 0;

/// Information about Postgres. Safekeeper gets it once and then verifies all
/// further connections from computes match. Note: it is a part of sk control
Expand Down
163 changes: 163 additions & 0 deletions libs/safekeeper_api/src/membership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! Types defining safekeeper membership, see
//! rfcs/035-safekeeper-dynamic-membership-change.md
//! for details.

use std::{collections::HashSet, fmt::Display};

use anyhow;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use utils::id::NodeId;

/// Number uniquely identifying safekeeper configuration.
/// Note: it is a part of sk control file.
pub type Generation = u32;
/// 1 is the first valid generation, 0 is used as
/// a placeholder before we fully migrate to generations.
pub const INVALID_GENERATION: Generation = 0;

/// Membership is defined by ids so e.g. walproposer uses them to figure out
/// quorums, but we also carry host and port to give wp idea where to connect.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SafekeeperId {
pub id: NodeId,
pub host: String,
pub pg_port: u16,
}

impl Display for SafekeeperId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port)
}
}

/// Set of safekeepers.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(transparent)]
pub struct MemberSet {
pub members: Vec<SafekeeperId>,
}

impl MemberSet {
pub fn empty() -> Self {
MemberSet {
members: Vec::new(),
}
}

pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
let hs: HashSet<NodeId> = HashSet::from_iter(members.iter().map(|sk| sk.id));
if hs.len() != members.len() {
bail!("duplicate safekeeper id in the set {:?}", members);
}
Ok(MemberSet { members })
}

pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.members.iter().any(|m| m.id == sk.id)
}

pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
if self.contains(&sk) {
bail!(format!(
"sk {} is already member of the set {}",
sk.id, self
));
}
self.members.push(sk);
Ok(())
}
}

impl Display for MemberSet {
/// Display as a comma separated list of members.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sks_str = self
.members
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>();
write!(f, "({})", sks_str.join(", "))
}
}

/// Safekeeper membership configuration.
/// Note: it is a part of both control file and http API.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Configuration {
/// Unique id.
pub generation: Generation,
/// Current members of the configuration.
pub members: MemberSet,
/// Some means it is a joint conf.
pub new_members: Option<MemberSet>,
}

impl Configuration {
/// Used for pre-generations timelines, will be removed eventually.
pub fn empty() -> Self {
Configuration {
generation: INVALID_GENERATION,
members: MemberSet::empty(),
new_members: None,
}
}
}

impl Display for Configuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"gen={}, members={}, new_members={}",
self.generation,
self.members,
self.new_members
.as_ref()
.map(ToString::to_string)
.unwrap_or(String::from("none"))
)
}
}

#[cfg(test)]
mod tests {
use super::{MemberSet, SafekeeperId};
use utils::id::NodeId;

#[test]
fn test_member_set() {
let mut members = MemberSet::empty();
members
.add(SafekeeperId {
id: NodeId(42),
host: String::from("lala.org"),
pg_port: 5432,
})
.unwrap();

members
.add(SafekeeperId {
id: NodeId(42),
host: String::from("lala.org"),
pg_port: 5432,
})
.expect_err("duplicate must not be allowed");

members
.add(SafekeeperId {
id: NodeId(43),
host: String::from("bubu.org"),
pg_port: 5432,
})
.unwrap();

println!("members: {}", members);

let j = serde_json::to_string(&members).expect("failed to serialize");
println!("members json: {}", j);
assert_eq!(
j,
r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
);
}
}
13 changes: 8 additions & 5 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use utils::{
pageserver_feedback::PageserverFeedback,
};

use crate::{ServerInfo, Term};
use crate::{membership::Configuration, ServerInfo, Term};

#[derive(Debug, Serialize)]
pub struct SafekeeperStatus {
Expand All @@ -22,13 +22,16 @@ pub struct SafekeeperStatus {
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub peer_ids: Option<Vec<NodeId>>,
pub mconf: Configuration,
pub pg_version: u32,
pub system_id: Option<u64>,
// By default WAL_SEGMENT_SIZE
pub wal_seg_size: Option<u32>,
pub commit_lsn: Lsn,
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
pub start_lsn: Lsn,
// Normal creation should omit this field (start_lsn initializes all LSNs).
// However, we allow specifying custom value higher than start_lsn for
// manual recovery case, see test_s3_wal_replay.
pub commit_lsn: Option<Lsn>,
}

/// Same as TermLsn, but serializes LSN using display serializer
Expand Down
20 changes: 14 additions & 6 deletions safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::{Utf8Path, Utf8PathBuf};
use safekeeper_api::membership::INVALID_GENERATION;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use utils::crashsafe::durable_rename;
Expand All @@ -13,14 +14,14 @@ use std::ops::Deref;
use std::path::Path;
use std::time::Instant;

use crate::control_file_upgrade::downgrade_v9_to_v8;
use crate::control_file_upgrade::downgrade_v10_to_v9;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::state::{EvictionState, TimelinePersistentState};
use utils::bin_ser::LeSer;

pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 9;
pub const SK_FORMAT_VERSION: u32 = 10;

// contains persistent metadata for safekeeper
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
Expand Down Expand Up @@ -169,10 +170,11 @@ impl TimelinePersistentState {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;

if self.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(self);
if self.mconf.generation == INVALID_GENERATION {
// Temp hack for forward compatibility test: in case of none
// configuration save cfile in previous v9 format.
const PREV_FORMAT_VERSION: u32 = 9;
let prev = downgrade_v10_to_v9(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
Expand Down Expand Up @@ -233,6 +235,7 @@ impl Storage for FileStorage {
#[cfg(test)]
mod test {
use super::*;
use safekeeper_api::membership::{Configuration, MemberSet};
use tokio::fs;
use utils::lsn::Lsn;

Expand All @@ -242,6 +245,11 @@ mod test {
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
state.mconf = Configuration {
generation: 42,
members: MemberSet::empty(),
new_members: None,
};
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;

// Make a change.
Expand Down
Loading
Loading