Skip to content

Commit

Permalink
Add safekeeper membership conf to control file.
Browse files Browse the repository at this point in the history
  • Loading branch information
arssher committed Dec 19, 2024
1 parent e226d7a commit 8b7de61
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 140 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions libs/safekeeper_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ edition.workspace = true
license.workspace = true

[dependencies]
anyhow.workspace = true
const_format.workspace = true
serde.workspace = true
serde_json.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions libs/safekeeper_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use const_format::formatcp;
use pq_proto::SystemId;
use serde::{Deserialize, Serialize};

pub mod membership;
/// Public API types
pub mod models;

Expand Down
163 changes: 163 additions & 0 deletions libs/safekeeper_api/src/membership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! Types defining safekeeper membership, see
//! rfcs/035-safekeeper-dynamic-membership-change.md
//! for details.
use std::{collections::HashSet, fmt::Display};

use anyhow;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use utils::id::NodeId;

/// Number uniquely identifying safekeeper configuration.
/// Note: it is a part of sk control file.
pub type Generation = u32;
/// 1 is the first valid generation, 0 is used as
/// a placeholder before we fully migrate to generations.
pub const INVALID_GENERATION: Generation = 0;

/// Membership is defined by ids so e.g. walproposer uses them to figure out
/// quorums, but we also carry host and port to give wp idea where to connect.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SafekeeperId {
pub id: NodeId,
pub host: String,
pub pg_port: u16,
}

impl Display for SafekeeperId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port)
}
}

/// Set of safekeepers.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(transparent)]
pub struct MemberSet {
pub members: Vec<SafekeeperId>,
}

impl MemberSet {
pub fn empty() -> Self {
MemberSet {
members: Vec::new(),
}
}

pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
let hs: HashSet<NodeId> = HashSet::from_iter(members.iter().map(|sk| sk.id));
if hs.len() != members.len() {
bail!("duplicate safekeeper id in the set {:?}", members);
}
Ok(MemberSet { members })
}

pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.members.iter().any(|m| m.id == sk.id)
}

pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
if self.contains(&sk) {
bail!(format!(
"sk {} is already member of the set {}",
sk.id, self
));
}
self.members.push(sk);
Ok(())
}
}

impl Display for MemberSet {
/// Display as a comma separated list of members.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sks_str = self
.members
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>();
write!(f, "({})", sks_str.join(", "))
}
}

/// Safekeeper membership configuration.
/// Note: it is a part of both control file and http API.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Configuration {
/// Unique id.
pub generation: Generation,
/// Current members of the configuration.
pub members: MemberSet,
/// Some means it is a joint conf.
pub new_members: Option<MemberSet>,
}

impl Configuration {
/// Used for pre-generations timelines, will be removed eventually.
pub fn empty() -> Self {
Configuration {
generation: INVALID_GENERATION,
members: MemberSet::empty(),
new_members: None,
}
}
}

impl Display for Configuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"gen={}, members={}, new_members={}",
self.generation,
self.members,
self.new_members
.as_ref()
.map(ToString::to_string)
.unwrap_or(String::from("none"))
)
}
}

#[cfg(test)]
mod tests {
use super::{MemberSet, SafekeeperId};
use utils::id::NodeId;

#[test]
fn test_member_set() {
let mut members = MemberSet::empty();
members
.add(SafekeeperId {
id: NodeId(42),
host: String::from("lala.org"),
pg_port: 5432,
})
.unwrap();

members
.add(SafekeeperId {
id: NodeId(42),
host: String::from("lala.org"),
pg_port: 5432,
})
.expect_err("duplicate must not be allowed");

members
.add(SafekeeperId {
id: NodeId(43),
host: String::from("bubu.org"),
pg_port: 5432,
})
.unwrap();

println!("members: {}", members);

let j = serde_json::to_string(&members).expect("failed to serialize");
println!("members json: {}", j);
assert_eq!(
j,
r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
);
}
}
4 changes: 2 additions & 2 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use utils::{
pageserver_feedback::PageserverFeedback,
};

use crate::{ServerInfo, Term};
use crate::{membership::Configuration, ServerInfo, Term};

#[derive(Debug, Serialize)]
pub struct SafekeeperStatus {
Expand All @@ -22,7 +22,7 @@ pub struct SafekeeperStatus {
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub peer_ids: Option<Vec<NodeId>>,
pub mconf: Configuration,
pub pg_version: u32,
pub system_id: Option<u64>,
pub wal_seg_size: Option<u32>,
Expand Down
20 changes: 14 additions & 6 deletions safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::{Utf8Path, Utf8PathBuf};
use safekeeper_api::membership::INVALID_GENERATION;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use utils::crashsafe::durable_rename;
Expand All @@ -13,14 +14,14 @@ use std::ops::Deref;
use std::path::Path;
use std::time::Instant;

use crate::control_file_upgrade::downgrade_v9_to_v8;
use crate::control_file_upgrade::downgrade_v10_to_v9;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::state::{EvictionState, TimelinePersistentState};
use utils::bin_ser::LeSer;

pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 9;
pub const SK_FORMAT_VERSION: u32 = 10;

// contains persistent metadata for safekeeper
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
Expand Down Expand Up @@ -169,10 +170,11 @@ impl TimelinePersistentState {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;

if self.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(self);
if self.mconf.generation == INVALID_GENERATION {
// Temp hack for forward compatibility test: in case of none
// configuration save cfile in previous v9 format.
const PREV_FORMAT_VERSION: u32 = 9;
let prev = downgrade_v10_to_v9(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
Expand Down Expand Up @@ -233,6 +235,7 @@ impl Storage for FileStorage {
#[cfg(test)]
mod test {
use super::*;
use safekeeper_api::membership::{Configuration, MemberSet};
use tokio::fs;
use utils::lsn::Lsn;

Expand All @@ -242,6 +245,11 @@ mod test {
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
state.mconf = Configuration {
generation: 42,
members: MemberSet::empty(),
new_members: None,
};
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;

// Make a change.
Expand Down
Loading

0 comments on commit 8b7de61

Please sign in to comment.