From b3d3a2587d80bc541403dc47d9b0729ab6e61b5d Mon Sep 17 00:00:00 2001 From: duguorong009 <80258679+duguorong009@users.noreply.github.com> Date: Mon, 6 Nov 2023 04:40:03 -0500 Subject: [PATCH] feat: improve the serde impl for several types(`Lsn`, `TenantId`, `TimelineId` ...) (#5335) Improve the serde impl for several types (`Lsn`, `TenantId`, `TimelineId`) by making them sensitive to `Serializer::is_human_readadable` (true for json, false for bincode). Fixes #3511 by: - Implement the custom serde for `Lsn` - Implement the custom serde for `Id` - Add the helper module `serde_as_u64` in `libs/utils/src/lsn.rs` - Remove the unnecessary attr `#[serde_as(as = "DisplayFromStr")]` in all possible structs Additionally some safekeeper types gained serde tests. --------- Co-authored-by: Joonas Koivunen --- Cargo.lock | 11 + Cargo.toml | 1 + control_plane/src/attachment_service.rs | 3 - control_plane/src/endpoint.rs | 4 - control_plane/src/local_env.rs | 4 - libs/compute_api/src/spec.rs | 11 +- libs/pageserver_api/src/control_api.rs | 7 - libs/pageserver_api/src/models.rs | 47 +--- libs/safekeeper_api/src/models.rs | 12 - libs/utils/Cargo.toml | 1 + libs/utils/src/auth.rs | 3 - libs/utils/src/hex.rs | 41 +++ libs/utils/src/id.rs | 180 ++++++++++++- libs/utils/src/lib.rs | 4 + libs/utils/src/lsn.rs | 206 +++++++++++++- libs/utils/src/pageserver_feedback.rs | 5 - pageserver/src/consumption_metrics/metrics.rs | 4 - pageserver/src/consumption_metrics/upload.rs | 4 - pageserver/src/deletion_queue.rs | 3 - pageserver/src/http/routes.rs | 5 - pageserver/src/tenant/metadata.rs | 119 +++++++++ .../tenant/remote_timeline_client/index.rs | 3 - pageserver/src/tenant/size.rs | 13 - pageserver/src/tenant/timeline.rs | 3 - s3_scrubber/src/cloud_admin_api.rs | 21 +- safekeeper/src/control_file_upgrade.rs | 252 +++++++++++++++++- safekeeper/src/debug_dump.rs | 4 - safekeeper/src/http/routes.rs | 13 - safekeeper/src/json_ctrl.rs | 3 + safekeeper/src/pull_timeline.rs | 5 - safekeeper/src/safekeeper.rs | 114 +++++++- safekeeper/src/send_wal.rs | 3 - safekeeper/src/timeline.rs | 6 - 33 files changed, 930 insertions(+), 185 deletions(-) create mode 100644 libs/utils/src/hex.rs diff --git a/Cargo.lock b/Cargo.lock index 2203ad462c28..a7e88688ce0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4681,6 +4681,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_assert" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eda563240c1288b044209be1f0d38bb4d15044fb3e00dc354fbc922ab4733e80" +dependencies = [ + "hashbrown 0.13.2", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.183" @@ -5967,6 +5977,7 @@ dependencies = [ "routerify", "sentry", "serde", + "serde_assert", "serde_json", "serde_with", "signal-hook", diff --git a/Cargo.toml b/Cargo.toml index 6f538941f95b..d992db4858d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,7 @@ sentry = { version = "0.31", default-features = false, features = ["backtrace", serde = { version = "1.0", features = ["derive"] } serde_json = "1" serde_with = "2.0" +serde_assert = "0.5.0" sha2 = "0.10.2" signal-hook = "0.3" smallvec = "1.11" diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index ca632c5eb676..fcefe0e43189 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -2,7 +2,6 @@ use crate::{background_process, local_env::LocalEnv}; use anyhow::anyhow; use camino::Utf8PathBuf; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use std::{path::PathBuf, process::Child}; use utils::id::{NodeId, TenantId}; @@ -14,10 +13,8 @@ pub struct AttachmentService { const COMMAND: &str = "attachment_service"; -#[serde_as] #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, pub node_id: Option, } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index cb16f48829dc..4443fd870432 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -46,7 +46,6 @@ use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use utils::id::{NodeId, TenantId, TimelineId}; use crate::local_env::LocalEnv; @@ -57,13 +56,10 @@ use compute_api::responses::{ComputeState, ComputeStatus}; use compute_api::spec::{Cluster, ComputeMode, ComputeSpec}; // contents of a endpoint.json file -#[serde_as] #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct EndpointConf { endpoint_id: String, - #[serde_as(as = "DisplayFromStr")] tenant_id: TenantId, - #[serde_as(as = "DisplayFromStr")] timeline_id: TimelineId, mode: ComputeMode, pg_port: u16, diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 45a7469787e5..b9c8aeddcb80 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -8,7 +8,6 @@ use anyhow::{bail, ensure, Context}; use postgres_backend::AuthType; use reqwest::Url; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; use std::env; use std::fs; @@ -33,7 +32,6 @@ pub const DEFAULT_PG_VERSION: u32 = 15; // to 'neon_local init --config=' option. See control_plane/simple.conf for // an example. // -#[serde_as] #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct LocalEnv { // Base directory for all the nodes (the pageserver, safekeepers and @@ -59,7 +57,6 @@ pub struct LocalEnv { // Default tenant ID to use with the 'neon_local' command line utility, when // --tenant_id is not explicitly specified. #[serde(default)] - #[serde_as(as = "Option")] pub default_tenant_id: Option, // used to issue tokens during e.g pg start @@ -84,7 +81,6 @@ pub struct LocalEnv { // A `HashMap>` would be more appropriate here, // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error. // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table". - #[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")] branch_name_mappings: HashMap>, } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index c16deceebbe4..175b4461ac42 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -6,7 +6,6 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -19,7 +18,6 @@ pub type PgIdent = String; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. -#[serde_as] #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct ComputeSpec { pub format_version: f32, @@ -50,12 +48,12 @@ pub struct ComputeSpec { // these, and instead set the "neon.tenant_id", "neon.timeline_id", // etc. GUCs in cluster.settings. TODO: Once the control plane has been // updated to fill these fields, we can make these non optional. - #[serde_as(as = "Option")] pub tenant_id: Option, - #[serde_as(as = "Option")] + pub timeline_id: Option, - #[serde_as(as = "Option")] + pub pageserver_connstring: Option, + #[serde(default)] pub safekeeper_connstrings: Vec, @@ -140,14 +138,13 @@ impl RemoteExtSpec { } } -#[serde_as] #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)] pub enum ComputeMode { /// A read-write node #[default] Primary, /// A read-only node, pinned at a particular LSN - Static(#[serde_as(as = "DisplayFromStr")] Lsn), + Static(Lsn), /// A read-only node that follows the tip of the branch in hot standby mode /// /// Future versions may want to distinguish between replicas with hot standby diff --git a/libs/pageserver_api/src/control_api.rs b/libs/pageserver_api/src/control_api.rs index 3819111a5b98..8232e81b9887 100644 --- a/libs/pageserver_api/src/control_api.rs +++ b/libs/pageserver_api/src/control_api.rs @@ -4,7 +4,6 @@ //! See docs/rfcs/025-generation-numbers.md use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use utils::id::{NodeId, TenantId}; #[derive(Serialize, Deserialize)] @@ -12,10 +11,8 @@ pub struct ReAttachRequest { pub node_id: NodeId, } -#[serde_as] #[derive(Serialize, Deserialize)] pub struct ReAttachResponseTenant { - #[serde_as(as = "DisplayFromStr")] pub id: TenantId, pub gen: u32, } @@ -25,10 +22,8 @@ pub struct ReAttachResponse { pub tenants: Vec, } -#[serde_as] #[derive(Serialize, Deserialize)] pub struct ValidateRequestTenant { - #[serde_as(as = "DisplayFromStr")] pub id: TenantId, pub gen: u32, } @@ -43,10 +38,8 @@ pub struct ValidateResponse { pub tenants: Vec, } -#[serde_as] #[derive(Serialize, Deserialize)] pub struct ValidateResponseTenant { - #[serde_as(as = "DisplayFromStr")] pub id: TenantId, pub valid: bool, } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e667645c0a26..cb99dc0a5530 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -6,7 +6,7 @@ use std::{ use byteorder::{BigEndian, ReadBytesExt}; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; +use serde_with::serde_as; use strum_macros; use utils::{ completion, @@ -174,25 +174,19 @@ pub enum TimelineState { Broken { reason: String, backtrace: String }, } -#[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { - #[serde_as(as = "DisplayFromStr")] pub new_timeline_id: TimelineId, #[serde(default)] - #[serde_as(as = "Option")] pub ancestor_timeline_id: Option, #[serde(default)] - #[serde_as(as = "Option")] pub ancestor_start_lsn: Option, pub pg_version: Option, } -#[serde_as] #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantCreateRequest { - #[serde_as(as = "DisplayFromStr")] pub new_tenant_id: TenantId, #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] @@ -201,7 +195,6 @@ pub struct TenantCreateRequest { pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it } -#[serde_as] #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantLoadRequest { @@ -278,31 +271,26 @@ pub struct LocationConfig { pub tenant_conf: TenantConfig, } -#[serde_as] #[derive(Serialize, Deserialize)] #[serde(transparent)] -pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId); +pub struct TenantCreateResponse(pub TenantId); #[derive(Serialize)] pub struct StatusResponse { pub id: NodeId, } -#[serde_as] #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantLocationConfigRequest { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, #[serde(flatten)] pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it } -#[serde_as] #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantConfigRequest { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, #[serde(flatten)] pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it @@ -374,10 +362,8 @@ pub enum TenantAttachmentStatus { Failed { reason: String }, } -#[serde_as] #[derive(Serialize, Deserialize, Clone)] pub struct TenantInfo { - #[serde_as(as = "DisplayFromStr")] pub id: TenantId, // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's pub state: TenantState, @@ -388,33 +374,22 @@ pub struct TenantInfo { } /// This represents the output of the "timeline_detail" and "timeline_list" API calls. -#[serde_as] #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TimelineInfo { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, - #[serde_as(as = "Option")] pub ancestor_timeline_id: Option, - #[serde_as(as = "Option")] pub ancestor_lsn: Option, - #[serde_as(as = "DisplayFromStr")] pub last_record_lsn: Lsn, - #[serde_as(as = "Option")] pub prev_record_lsn: Option, - #[serde_as(as = "DisplayFromStr")] pub latest_gc_cutoff_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, /// The LSN that we have succesfully uploaded to remote storage - #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, /// The LSN that we are advertizing to safekeepers - #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn_visible: Lsn, pub current_logical_size: Option, // is None when timeline is Unloaded @@ -426,7 +401,6 @@ pub struct TimelineInfo { pub timeline_dir_layer_file_size_sum: Option, pub wal_source_connstr: Option, - #[serde_as(as = "Option")] pub last_received_msg_lsn: Option, /// the timestamp (in microseconds) of the last received message pub last_received_msg_ts: Option, @@ -523,23 +497,13 @@ pub struct LayerAccessStats { pub residence_events_history: HistoryBufferWithDropCounter, } -#[serde_as] #[derive(Debug, Clone, Serialize)] #[serde(tag = "kind")] pub enum InMemoryLayerInfo { - Open { - #[serde_as(as = "DisplayFromStr")] - lsn_start: Lsn, - }, - Frozen { - #[serde_as(as = "DisplayFromStr")] - lsn_start: Lsn, - #[serde_as(as = "DisplayFromStr")] - lsn_end: Lsn, - }, + Open { lsn_start: Lsn }, + Frozen { lsn_start: Lsn, lsn_end: Lsn }, } -#[serde_as] #[derive(Debug, Clone, Serialize)] #[serde(tag = "kind")] pub enum HistoricLayerInfo { @@ -547,9 +511,7 @@ pub enum HistoricLayerInfo { layer_file_name: String, layer_file_size: u64, - #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, - #[serde_as(as = "DisplayFromStr")] lsn_end: Lsn, remote: bool, access_stats: LayerAccessStats, @@ -558,7 +520,6 @@ pub enum HistoricLayerInfo { layer_file_name: String, layer_file_size: u64, - #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, remote: bool, access_stats: LayerAccessStats, diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index eaea266f321c..786712deb1c5 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -1,23 +1,18 @@ use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use utils::{ id::{NodeId, TenantId, TimelineId}, lsn::Lsn, }; -#[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, pub peer_ids: Option>, pub pg_version: u32, pub system_id: Option, pub wal_seg_size: Option, - #[serde_as(as = "DisplayFromStr")] pub commit_lsn: Lsn, // If not passed, it is assigned to the beginning of commit_lsn segment. pub local_start_lsn: Option, @@ -28,7 +23,6 @@ fn lsn_invalid() -> Lsn { } /// Data about safekeeper's timeline, mirrors broker.proto. -#[serde_as] #[derive(Debug, Clone, Deserialize, Serialize)] pub struct SkTimelineInfo { /// Term. @@ -36,25 +30,19 @@ pub struct SkTimelineInfo { /// Term of the last entry. pub last_log_term: Option, /// LSN of the last record. - #[serde_as(as = "DisplayFromStr")] #[serde(default = "lsn_invalid")] pub flush_lsn: Lsn, /// Up to which LSN safekeeper regards its WAL as committed. - #[serde_as(as = "DisplayFromStr")] #[serde(default = "lsn_invalid")] pub commit_lsn: Lsn, /// LSN up to which safekeeper has backed WAL. - #[serde_as(as = "DisplayFromStr")] #[serde(default = "lsn_invalid")] pub backup_lsn: Lsn, /// LSN of last checkpoint uploaded by pageserver. - #[serde_as(as = "DisplayFromStr")] #[serde(default = "lsn_invalid")] pub remote_consistent_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] #[serde(default = "lsn_invalid")] pub peer_horizon_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] #[serde(default = "lsn_invalid")] pub local_start_lsn: Lsn, /// A connection string to use for WAL receiving. diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 27df0265b412..3f4ef2abebad 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -55,6 +55,7 @@ bytes.workspace = true criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true +serde_assert.workspace = true [[bench]] name = "benchmarks" diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index 54b90fa07079..37299e4e7fb9 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -9,7 +9,6 @@ use jsonwebtoken::{ decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, }; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use crate::id::TenantId; @@ -32,11 +31,9 @@ pub enum Scope { } /// JWT payload. See docs/authentication.md for the format -#[serde_as] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Claims { #[serde(default)] - #[serde_as(as = "Option")] pub tenant_id: Option, pub scope: Scope, } diff --git a/libs/utils/src/hex.rs b/libs/utils/src/hex.rs new file mode 100644 index 000000000000..fc0bb7e4a280 --- /dev/null +++ b/libs/utils/src/hex.rs @@ -0,0 +1,41 @@ +/// Useful type for asserting that expected bytes match reporting the bytes more readable +/// array-syntax compatible hex bytes. +/// +/// # Usage +/// +/// ``` +/// use utils::Hex; +/// +/// let actual = serialize_something(); +/// let expected = [0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64]; +/// +/// // the type implements PartialEq and on mismatch, both sides are printed in 16 wide multiline +/// // output suffixed with an array style length for easier comparisons. +/// assert_eq!(Hex(&actual), Hex(&expected)); +/// +/// // with `let expected = [0x68];` the error would had been: +/// // assertion `left == right` failed +/// // left: [0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64; 11] +/// // right: [0x68; 1] +/// # fn serialize_something() -> Vec { "hello world".as_bytes().to_vec() } +/// ``` +#[derive(PartialEq)] +pub struct Hex<'a>(pub &'a [u8]); + +impl std::fmt::Debug for Hex<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + for (i, c) in self.0.chunks(16).enumerate() { + if i > 0 && !c.is_empty() { + writeln!(f, ", ")?; + } + for (j, b) in c.iter().enumerate() { + if j > 0 { + write!(f, ", ")?; + } + write!(f, "0x{b:02x}")?; + } + } + write!(f, "; {}]", self.0.len()) + } +} diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index ec13c2f96f3c..eef6a358e64d 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -3,6 +3,7 @@ use std::{fmt, str::FromStr}; use anyhow::Context; use hex::FromHex; use rand::Rng; +use serde::de::Visitor; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -17,12 +18,74 @@ pub enum IdError { /// /// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look /// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`. -/// -/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`. -/// Check the `serde_with::serde_as` documentation for options for more complex types. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] struct Id([u8; 16]); +impl Serialize for Id { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.collect_str(self) + } else { + self.0.serialize(serializer) + } + } +} + +impl<'de> Deserialize<'de> for Id { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct IdVisitor { + is_human_readable_deserializer: bool, + } + + impl<'de> Visitor<'de> for IdVisitor { + type Value = Id; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + if self.is_human_readable_deserializer { + formatter.write_str("value in form of hex string") + } else { + formatter.write_str("value in form of integer array([u8; 16])") + } + } + + fn visit_seq(self, seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let s = serde::de::value::SeqAccessDeserializer::new(seq); + let id: [u8; 16] = Deserialize::deserialize(s)?; + Ok(Id::from(id)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Id::from_str(v).map_err(E::custom) + } + } + + if deserializer.is_human_readable() { + deserializer.deserialize_str(IdVisitor { + is_human_readable_deserializer: true, + }) + } else { + deserializer.deserialize_tuple( + 16, + IdVisitor { + is_human_readable_deserializer: false, + }, + ) + } + } +} + impl Id { pub fn get_from_buf(buf: &mut impl bytes::Buf) -> Id { let mut arr = [0u8; 16]; @@ -308,3 +371,112 @@ impl fmt::Display for NodeId { write!(f, "{}", self.0) } } + +#[cfg(test)] +mod tests { + use serde_assert::{Deserializer, Serializer, Token, Tokens}; + + use crate::bin_ser::BeSer; + + use super::*; + + #[test] + fn test_id_serde_non_human_readable() { + let original_id = Id([ + 173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24, + ]); + let expected_tokens = Tokens(vec![ + Token::Tuple { len: 16 }, + Token::U8(173), + Token::U8(80), + Token::U8(132), + Token::U8(115), + Token::U8(129), + Token::U8(226), + Token::U8(72), + Token::U8(254), + Token::U8(170), + Token::U8(201), + Token::U8(135), + Token::U8(108), + Token::U8(199), + Token::U8(26), + Token::U8(228), + Token::U8(24), + Token::TupleEnd, + ]); + + let serializer = Serializer::builder().is_human_readable(false).build(); + let serialized_tokens = original_id.serialize(&serializer).unwrap(); + assert_eq!(serialized_tokens, expected_tokens); + + let mut deserializer = Deserializer::builder() + .is_human_readable(false) + .tokens(serialized_tokens) + .build(); + let deserialized_id = Id::deserialize(&mut deserializer).unwrap(); + assert_eq!(deserialized_id, original_id); + } + + #[test] + fn test_id_serde_human_readable() { + let original_id = Id([ + 173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24, + ]); + let expected_tokens = Tokens(vec![Token::Str(String::from( + "ad50847381e248feaac9876cc71ae418", + ))]); + + let serializer = Serializer::builder().is_human_readable(true).build(); + let serialized_tokens = original_id.serialize(&serializer).unwrap(); + assert_eq!(serialized_tokens, expected_tokens); + + let mut deserializer = Deserializer::builder() + .is_human_readable(true) + .tokens(Tokens(vec![Token::Str(String::from( + "ad50847381e248feaac9876cc71ae418", + ))])) + .build(); + assert_eq!(Id::deserialize(&mut deserializer).unwrap(), original_id); + } + + macro_rules! roundtrip_type { + ($type:ty, $expected_bytes:expr) => {{ + let expected_bytes: [u8; 16] = $expected_bytes; + let original_id = <$type>::from(expected_bytes); + + let ser_bytes = original_id.ser().unwrap(); + assert_eq!(ser_bytes, expected_bytes); + + let des_id = <$type>::des(&ser_bytes).unwrap(); + assert_eq!(des_id, original_id); + }}; + } + + #[test] + fn test_id_bincode_serde() { + let expected_bytes = [ + 173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24, + ]; + + roundtrip_type!(Id, expected_bytes); + } + + #[test] + fn test_tenant_id_bincode_serde() { + let expected_bytes = [ + 173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24, + ]; + + roundtrip_type!(TenantId, expected_bytes); + } + + #[test] + fn test_timeline_id_bincode_serde() { + let expected_bytes = [ + 173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24, + ]; + + roundtrip_type!(TimelineId, expected_bytes); + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 88cefd516d75..1e3403402318 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -24,6 +24,10 @@ pub mod auth; // utility functions and helper traits for unified unique id generation/serialization etc. pub mod id; + +mod hex; +pub use hex::Hex; + // http endpoint utils pub mod http; diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 7d9baf7d49fd..262dcb8a8a91 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -1,7 +1,7 @@ #![warn(missing_docs)] use camino::Utf8Path; -use serde::{Deserialize, Serialize}; +use serde::{de::Visitor, Deserialize, Serialize}; use std::fmt; use std::ops::{Add, AddAssign}; use std::str::FromStr; @@ -13,10 +13,114 @@ use crate::seqwait::MonotonicCounter; pub const XLOG_BLCKSZ: u32 = 8192; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)] -#[serde(transparent)] +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)] pub struct Lsn(pub u64); +impl Serialize for Lsn { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.collect_str(self) + } else { + self.0.serialize(serializer) + } + } +} + +impl<'de> Deserialize<'de> for Lsn { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct LsnVisitor { + is_human_readable_deserializer: bool, + } + + impl<'de> Visitor<'de> for LsnVisitor { + type Value = Lsn; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + if self.is_human_readable_deserializer { + formatter.write_str( + "value in form of hex string({upper_u32_hex}/{lower_u32_hex}) representing u64 integer", + ) + } else { + formatter.write_str("value in form of integer(u64)") + } + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(Lsn(v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Lsn::from_str(v).map_err(|e| E::custom(e)) + } + } + + if deserializer.is_human_readable() { + deserializer.deserialize_str(LsnVisitor { + is_human_readable_deserializer: true, + }) + } else { + deserializer.deserialize_u64(LsnVisitor { + is_human_readable_deserializer: false, + }) + } + } +} + +/// Allows (de)serialization of an `Lsn` always as `u64`. +/// +/// ### Example +/// +/// ```rust +/// # use serde::{Serialize, Deserialize}; +/// use utils::lsn::Lsn; +/// +/// #[derive(PartialEq, Serialize, Deserialize, Debug)] +/// struct Foo { +/// #[serde(with = "utils::lsn::serde_as_u64")] +/// always_u64: Lsn, +/// } +/// +/// let orig = Foo { always_u64: Lsn(1234) }; +/// +/// let res = serde_json::to_string(&orig).unwrap(); +/// assert_eq!(res, r#"{"always_u64":1234}"#); +/// +/// let foo = serde_json::from_str::(&res).unwrap(); +/// assert_eq!(foo, orig); +/// ``` +/// +pub mod serde_as_u64 { + use super::Lsn; + + /// Serializes the Lsn as u64 disregarding the human readability of the format. + /// + /// Meant to be used via `#[serde(with = "...")]` or `#[serde(serialize_with = "...")]`. + pub fn serialize(lsn: &Lsn, serializer: S) -> Result { + use serde::Serialize; + lsn.0.serialize(serializer) + } + + /// Deserializes the Lsn as u64 disregarding the human readability of the format. + /// + /// Meant to be used via `#[serde(with = "...")]` or `#[serde(deserialize_with = "...")]`. + pub fn deserialize<'de, D: serde::Deserializer<'de>>(deserializer: D) -> Result { + use serde::Deserialize; + u64::deserialize(deserializer).map(Lsn) + } +} + /// We tried to parse an LSN from a string, but failed #[derive(Debug, PartialEq, Eq, thiserror::Error)] #[error("LsnParseError")] @@ -264,8 +368,13 @@ impl MonotonicCounter for RecordLsn { #[cfg(test)] mod tests { + use crate::bin_ser::BeSer; + use super::*; + use serde::ser::Serialize; + use serde_assert::{Deserializer, Serializer, Token, Tokens}; + #[test] fn test_lsn_strings() { assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555))); @@ -341,4 +450,95 @@ mod tests { assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678)); assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000)); } + + #[test] + fn test_lsn_serde() { + let original_lsn = Lsn(0x0123456789abcdef); + let expected_readable_tokens = Tokens(vec![Token::U64(0x0123456789abcdef)]); + let expected_non_readable_tokens = + Tokens(vec![Token::Str(String::from("1234567/89ABCDEF"))]); + + // Testing human_readable ser/de + let serializer = Serializer::builder().is_human_readable(false).build(); + let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap(); + assert_eq!(readable_ser_tokens, expected_readable_tokens); + + let mut deserializer = Deserializer::builder() + .is_human_readable(false) + .tokens(readable_ser_tokens) + .build(); + let des_lsn = Lsn::deserialize(&mut deserializer).unwrap(); + assert_eq!(des_lsn, original_lsn); + + // Testing NON human_readable ser/de + let serializer = Serializer::builder().is_human_readable(true).build(); + let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap(); + assert_eq!(non_readable_ser_tokens, expected_non_readable_tokens); + + let mut deserializer = Deserializer::builder() + .is_human_readable(true) + .tokens(non_readable_ser_tokens) + .build(); + let des_lsn = Lsn::deserialize(&mut deserializer).unwrap(); + assert_eq!(des_lsn, original_lsn); + + // Testing mismatching ser/de + let serializer = Serializer::builder().is_human_readable(false).build(); + let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap(); + + let mut deserializer = Deserializer::builder() + .is_human_readable(true) + .tokens(non_readable_ser_tokens) + .build(); + Lsn::deserialize(&mut deserializer).unwrap_err(); + + let serializer = Serializer::builder().is_human_readable(true).build(); + let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap(); + + let mut deserializer = Deserializer::builder() + .is_human_readable(false) + .tokens(readable_ser_tokens) + .build(); + Lsn::deserialize(&mut deserializer).unwrap_err(); + } + + #[test] + fn test_lsn_ensure_roundtrip() { + let original_lsn = Lsn(0xaaaabbbb); + + let serializer = Serializer::builder().is_human_readable(false).build(); + let ser_tokens = original_lsn.serialize(&serializer).unwrap(); + + let mut deserializer = Deserializer::builder() + .is_human_readable(false) + .tokens(ser_tokens) + .build(); + + let des_lsn = Lsn::deserialize(&mut deserializer).unwrap(); + assert_eq!(des_lsn, original_lsn); + } + + #[test] + fn test_lsn_bincode_serde() { + let lsn = Lsn(0x0123456789abcdef); + let expected_bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef]; + + let ser_bytes = lsn.ser().unwrap(); + assert_eq!(ser_bytes, expected_bytes); + + let des_lsn = Lsn::des(&ser_bytes).unwrap(); + assert_eq!(des_lsn, lsn); + } + + #[test] + fn test_lsn_bincode_ensure_roundtrip() { + let original_lsn = Lsn(0x01_02_03_04_05_06_07_08); + let expected_bytes = vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]; + + let ser_bytes = original_lsn.ser().unwrap(); + assert_eq!(ser_bytes, expected_bytes); + + let des_lsn = Lsn::des(&ser_bytes).unwrap(); + assert_eq!(des_lsn, original_lsn); + } } diff --git a/libs/utils/src/pageserver_feedback.rs b/libs/utils/src/pageserver_feedback.rs index a3b53201d305..c9fbdde92847 100644 --- a/libs/utils/src/pageserver_feedback.rs +++ b/libs/utils/src/pageserver_feedback.rs @@ -3,7 +3,6 @@ use std::time::{Duration, SystemTime}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use pq_proto::{read_cstr, PG_EPOCH}; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use tracing::{trace, warn}; use crate::lsn::Lsn; @@ -15,21 +14,17 @@ use crate::lsn::Lsn; /// /// serde Serialize is used only for human readable dump to json (e.g. in /// safekeepers debug_dump). -#[serde_as] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct PageserverFeedback { /// Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, /// LSN last received and ingested by the pageserver. Controls backpressure. - #[serde_as(as = "DisplayFromStr")] pub last_received_lsn: Lsn, /// LSN up to which data is persisted by the pageserver to its local disc. /// Controls backpressure. - #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, /// LSN up to which data is persisted by the pageserver on s3; safekeepers /// consider WAL before it can be removed. - #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, // Serialize with RFC3339 format. #[serde(with = "serde_systemtime")] diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 652dd98683d6..c22f218976fe 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -3,7 +3,6 @@ use anyhow::Context; use chrono::{DateTime, Utc}; use consumption_metrics::EventType; use futures::stream::StreamExt; -use serde_with::serde_as; use std::{sync::Arc, time::SystemTime}; use utils::{ id::{TenantId, TimelineId}, @@ -42,13 +41,10 @@ pub(super) enum Name { /// /// This is a denormalization done at the MetricsKey const methods; these should not be constructed /// elsewhere. -#[serde_with::serde_as] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub(crate) struct MetricsKey { - #[serde_as(as = "serde_with::DisplayFromStr")] pub(super) tenant_id: TenantId, - #[serde_as(as = "Option")] #[serde(skip_serializing_if = "Option::is_none")] pub(super) timeline_id: Option, diff --git a/pageserver/src/consumption_metrics/upload.rs b/pageserver/src/consumption_metrics/upload.rs index d69d43a2a8c8..322ed95cc875 100644 --- a/pageserver/src/consumption_metrics/upload.rs +++ b/pageserver/src/consumption_metrics/upload.rs @@ -1,5 +1,4 @@ use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE}; -use serde_with::serde_as; use tokio_util::sync::CancellationToken; use tracing::Instrument; @@ -7,12 +6,9 @@ use super::{metrics::Name, Cache, MetricsKey, RawMetric}; use utils::id::{TenantId, TimelineId}; /// How the metrics from pageserver are identified. -#[serde_with::serde_as] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)] struct Ids { - #[serde_as(as = "serde_with::DisplayFromStr")] pub(super) tenant_id: TenantId, - #[serde_as(as = "Option")] #[serde(skip_serializing_if = "Option::is_none")] pub(super) timeline_id: Option, } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 7b2db929fa0d..b6f889c682d4 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -18,7 +18,6 @@ use hex::FromHex; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; -use serde_with::serde_as; use thiserror::Error; use tokio; use tokio_util::sync::CancellationToken; @@ -215,7 +214,6 @@ where /// during recovery as startup. const TEMP_SUFFIX: &str = "tmp"; -#[serde_as] #[derive(Debug, Serialize, Deserialize)] struct DeletionList { /// Serialization version, for future use @@ -244,7 +242,6 @@ struct DeletionList { validated: bool, } -#[serde_as] #[derive(Debug, Serialize, Deserialize)] struct DeletionHeader { /// Serialization version, for future use diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2c46d733d678..a6fb26b298b9 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -17,7 +17,6 @@ use pageserver_api::models::{ TenantLoadRequest, TenantLocationConfigRequest, }; use remote_storage::GenericRemoteStorage; -use serde_with::{serde_as, DisplayFromStr}; use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -499,10 +498,8 @@ async fn get_lsn_by_timestamp_handler( let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?; if version.unwrap_or(0) > 1 { - #[serde_as] #[derive(serde::Serialize)] struct Result { - #[serde_as(as = "DisplayFromStr")] lsn: Lsn, kind: &'static str, } @@ -811,10 +808,8 @@ async fn tenant_size_handler( } /// The type resides in the pageserver not to expose `ModelInputs`. - #[serde_with::serde_as] #[derive(serde::Serialize)] struct TenantHistorySize { - #[serde_as(as = "serde_with::DisplayFromStr")] id: TenantId, /// Size is a mixture of WAL and logical size, so the unit is bytes. /// diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 75ffe0969614..38fd42674605 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -406,4 +406,123 @@ mod tests { METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION ); } + + #[test] + fn test_metadata_bincode_serde() { + let original_metadata = TimelineMetadata::new( + Lsn(0x200), + Some(Lsn(0x100)), + Some(TIMELINE_ID), + Lsn(0), + Lsn(0), + Lsn(0), + // Any version will do here, so use the default + crate::DEFAULT_PG_VERSION, + ); + let metadata_bytes = original_metadata + .to_bytes() + .expect("Cannot create bytes array from metadata"); + + let metadata_bincode_be_bytes = original_metadata + .ser() + .expect("Cannot serialize the metadata"); + + // 8 bytes for the length of the vector + assert_eq!(metadata_bincode_be_bytes.len(), 8 + metadata_bytes.len()); + + let expected_bincode_bytes = { + let mut temp = vec![]; + let len_bytes = metadata_bytes.len().to_be_bytes(); + temp.extend_from_slice(&len_bytes); + temp.extend_from_slice(&metadata_bytes); + temp + }; + assert_eq!(metadata_bincode_be_bytes, expected_bincode_bytes); + + let deserialized_metadata = TimelineMetadata::des(&metadata_bincode_be_bytes).unwrap(); + // Deserialized metadata has the metadata header, which is different from the serialized one. + // Reference: TimelineMetaData::to_bytes() + let expected_metadata = { + let mut temp_metadata = original_metadata; + let body_bytes = temp_metadata + .body + .ser() + .expect("Cannot serialize the metadata body"); + let metadata_size = METADATA_HDR_SIZE + body_bytes.len(); + let hdr = TimelineMetadataHeader { + size: metadata_size as u16, + format_version: METADATA_FORMAT_VERSION, + checksum: crc32c::crc32c(&body_bytes), + }; + temp_metadata.hdr = hdr; + temp_metadata + }; + assert_eq!(deserialized_metadata, expected_metadata); + } + + #[test] + fn test_metadata_bincode_serde_ensure_roundtrip() { + let original_metadata = TimelineMetadata::new( + Lsn(0x200), + Some(Lsn(0x100)), + Some(TIMELINE_ID), + Lsn(0), + Lsn(0), + Lsn(0), + // Any version will do here, so use the default + crate::DEFAULT_PG_VERSION, + ); + let expected_bytes = vec![ + /* bincode length encoding bytes */ + 0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector + /* TimelineMetadataHeader */ + 4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2) + /* TimelineMetadataBodyV2 */ + 0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes) + 1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes) + 1, 17, 34, 51, 68, 85, 102, 119, 136, 17, 34, 51, 68, 85, 102, 119, + 136, // ancestor_timeline (17 bytes) + 0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes) + 0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes) + 0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes) + 0, 0, 0, 15, // pg_version (4 bytes) + /* padding bytes */ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, + ]; + let metadata_ser_bytes = original_metadata.ser().unwrap(); + assert_eq!(metadata_ser_bytes, expected_bytes); + + let expected_metadata = { + let mut temp_metadata = original_metadata; + let body_bytes = temp_metadata + .body + .ser() + .expect("Cannot serialize the metadata body"); + let metadata_size = METADATA_HDR_SIZE + body_bytes.len(); + let hdr = TimelineMetadataHeader { + size: metadata_size as u16, + format_version: METADATA_FORMAT_VERSION, + checksum: crc32c::crc32c(&body_bytes), + }; + temp_metadata.hdr = hdr; + temp_metadata + }; + let des_metadata = TimelineMetadata::des(&metadata_ser_bytes).unwrap(); + assert_eq!(des_metadata, expected_metadata); + } } diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index fdab74a8befb..fa0679c7a2fe 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -6,7 +6,6 @@ use std::collections::HashMap; use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use utils::bin_ser::SerializeError; use crate::tenant::metadata::TimelineMetadata; @@ -58,7 +57,6 @@ impl LayerFileMetadata { /// /// This type needs to be backwards and forwards compatible. When changing the fields, /// remember to add a test case for the changed version. -#[serde_as] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct IndexPart { /// Debugging aid describing the version of this type. @@ -78,7 +76,6 @@ pub struct IndexPart { // 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata. // It's duplicated for convenience when reading the serialized structure, but is // private because internally we would read from metadata instead. - #[serde_as(as = "DisplayFromStr")] disk_consistent_lsn: Lsn, #[serde(rename = "metadata_bytes")] diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index e737d3f59c0b..5e8ee2b99e0c 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -29,7 +29,6 @@ use tenant_size_model::{Segment, StorageModel}; /// needs. We will convert this into a StorageModel when it's time to perform /// the calculation. /// -#[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ModelInputs { pub segments: Vec, @@ -37,11 +36,9 @@ pub struct ModelInputs { } /// A [`Segment`], with some extra information for display purposes -#[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct SegmentMeta { pub segment: Segment, - #[serde_as(as = "serde_with::DisplayFromStr")] pub timeline_id: TimelineId, pub kind: LsnKind, } @@ -77,32 +74,22 @@ pub enum LsnKind { /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as /// part of [`ModelInputs`] from the HTTP api, explaining the inputs. -#[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct TimelineInputs { - #[serde_as(as = "serde_with::DisplayFromStr")] pub timeline_id: TimelineId, - #[serde_as(as = "Option")] pub ancestor_id: Option, - #[serde_as(as = "serde_with::DisplayFromStr")] ancestor_lsn: Lsn, - #[serde_as(as = "serde_with::DisplayFromStr")] last_record: Lsn, - #[serde_as(as = "serde_with::DisplayFromStr")] latest_gc_cutoff: Lsn, - #[serde_as(as = "serde_with::DisplayFromStr")] horizon_cutoff: Lsn, - #[serde_as(as = "serde_with::DisplayFromStr")] pitr_cutoff: Lsn, /// Cutoff point based on GC settings - #[serde_as(as = "serde_with::DisplayFromStr")] next_gc_cutoff: Lsn, /// Cutoff point calculated from the user-supplied 'max_retention_period' - #[serde_as(as = "Option")] retention_param_cutoff: Option, } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4608683c4a7c..baa97b6cf9cc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2936,13 +2936,10 @@ struct CompactLevel0Phase1StatsBuilder { new_deltas_size: Option, } -#[serde_as] #[derive(serde::Serialize)] struct CompactLevel0Phase1Stats { version: u64, - #[serde_as(as = "serde_with::DisplayFromStr")] tenant_id: TenantId, - #[serde_as(as = "serde_with::DisplayFromStr")] timeline_id: TimelineId, read_lock_acquisition_micros: RecordedDuration, read_lock_held_spawn_blocking_startup_micros: RecordedDuration, diff --git a/s3_scrubber/src/cloud_admin_api.rs b/s3_scrubber/src/cloud_admin_api.rs index 1b28dc4dff0b..151421c84f1e 100644 --- a/s3_scrubber/src/cloud_admin_api.rs +++ b/s3_scrubber/src/cloud_admin_api.rs @@ -5,6 +5,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use hex::FromHex; +use pageserver::tenant::Tenant; use reqwest::{header, Client, StatusCode, Url}; use serde::Deserialize; use tokio::sync::Semaphore; @@ -118,13 +119,18 @@ fn from_nullable_id<'de, D>(deserializer: D) -> Result where D: serde::de::Deserializer<'de>, { - let id_str = String::deserialize(deserializer)?; - if id_str.is_empty() { - // This is a bogus value, but for the purposes of the scrubber all that - // matters is that it doesn't collide with any real IDs. - Ok(TenantId::from([0u8; 16])) + if deserializer.is_human_readable() { + let id_str = String::deserialize(deserializer)?; + if id_str.is_empty() { + // This is a bogus value, but for the purposes of the scrubber all that + // matters is that it doesn't collide with any real IDs. + Ok(TenantId::from([0u8; 16])) + } else { + TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}"))) + } } else { - TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}"))) + let id_arr = <[u8; 16]>::deserialize(deserializer)?; + Ok(TenantId::from(id_arr)) } } @@ -153,7 +159,6 @@ pub struct ProjectData { pub maintenance_set: Option, } -#[serde_with::serde_as] #[derive(Debug, serde::Deserialize)] pub struct BranchData { pub id: BranchId, @@ -161,12 +166,10 @@ pub struct BranchData { pub updated_at: DateTime, pub name: String, pub project_id: ProjectId, - #[serde_as(as = "serde_with::DisplayFromStr")] pub timeline_id: TimelineId, #[serde(default)] pub parent_id: Option, #[serde(default)] - #[serde_as(as = "Option")] pub parent_lsn: Option, pub default: bool, pub deleted: bool, diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index a7f17df797f7..a0be2b205468 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -13,7 +13,7 @@ use utils::{ }; /// Persistent consensus state of the acceptor. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] struct AcceptorStateV1 { /// acceptor's last term it voted for (advanced in 1 phase) term: Term, @@ -21,7 +21,7 @@ struct AcceptorStateV1 { epoch: Term, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] struct SafeKeeperStateV1 { /// persistent acceptor state acceptor_state: AcceptorStateV1, @@ -50,7 +50,7 @@ pub struct ServerInfoV2 { pub wal_seg_size: u32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct SafeKeeperStateV2 { /// persistent acceptor state pub acceptor_state: AcceptorState, @@ -81,7 +81,7 @@ pub struct ServerInfoV3 { pub wal_seg_size: u32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct SafeKeeperStateV3 { /// persistent acceptor state pub acceptor_state: AcceptorState, @@ -101,7 +101,7 @@ pub struct SafeKeeperStateV3 { pub wal_start_lsn: Lsn, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct SafeKeeperStateV4 { #[serde(with = "hex")] pub tenant_id: TenantId, @@ -264,3 +264,245 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result } bail!("unsupported safekeeper control file version {}", version) } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use utils::{id::NodeId, Hex}; + + use crate::safekeeper::PersistedPeerInfo; + + use super::*; + + #[test] + fn roundtrip_v1() { + let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); + let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); + let state = SafeKeeperStateV1 { + acceptor_state: AcceptorStateV1 { + term: 42, + epoch: 43, + }, + server: ServerInfoV2 { + pg_version: 14, + system_id: 0x1234567887654321, + tenant_id, + timeline_id, + wal_seg_size: 0x12345678, + }, + proposer_uuid: { + let mut arr = timeline_id.as_arr(); + arr.reverse(); + arr + }, + commit_lsn: Lsn(1234567800), + truncate_lsn: Lsn(123456780), + wal_start_lsn: Lsn(1234567800 - 8), + }; + + let ser = state.ser().unwrap(); + #[rustfmt::skip] + let expected = [ + // term + 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // epoch + 0x2b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // pg_version + 0x0e, 0x00, 0x00, 0x00, + // system_id + 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12, + // tenant_id + 0xcf, 0x04, 0x80, 0x92, 0x97, 0x07, 0xee, 0x75, 0x37, 0x23, 0x37, 0xef, 0xaa, 0x5e, 0xcf, 0x96, + // timeline_id + 0x11, 0x2d, 0xed, 0x66, 0x42, 0x2a, 0xa5, 0xe9, 0x53, 0xe5, 0x44, 0x0f, 0xa5, 0x42, 0x7a, 0xc4, + // wal_seg_size + 0x78, 0x56, 0x34, 0x12, + // proposer_uuid + 0xc4, 0x7a, 0x42, 0xa5, 0x0f, 0x44, 0xe5, 0x53, 0xe9, 0xa5, 0x2a, 0x42, 0x66, 0xed, 0x2d, 0x11, + // commit_lsn + 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + // truncate_lsn + 0x0c, 0xcd, 0x5b, 0x07, 0x00, 0x00, 0x00, 0x00, + // wal_start_lsn + 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + ]; + + assert_eq!(Hex(&ser), Hex(&expected)); + + let deser = SafeKeeperStateV1::des(&ser).unwrap(); + + assert_eq!(state, deser); + } + + #[test] + fn roundtrip_v2() { + let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); + let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); + let state = SafeKeeperStateV2 { + acceptor_state: AcceptorState { + term: 42, + term_history: TermHistory(vec![TermLsn { + lsn: Lsn(0x1), + term: 41, + }]), + }, + server: ServerInfoV2 { + pg_version: 14, + system_id: 0x1234567887654321, + tenant_id, + timeline_id, + wal_seg_size: 0x12345678, + }, + proposer_uuid: { + let mut arr = timeline_id.as_arr(); + arr.reverse(); + arr + }, + commit_lsn: Lsn(1234567800), + truncate_lsn: Lsn(123456780), + wal_start_lsn: Lsn(1234567800 - 8), + }; + + let ser = state.ser().unwrap(); + let expected = [ + 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, 0x00, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, + 0x34, 0x12, 0xcf, 0x04, 0x80, 0x92, 0x97, 0x07, 0xee, 0x75, 0x37, 0x23, 0x37, 0xef, + 0xaa, 0x5e, 0xcf, 0x96, 0x11, 0x2d, 0xed, 0x66, 0x42, 0x2a, 0xa5, 0xe9, 0x53, 0xe5, + 0x44, 0x0f, 0xa5, 0x42, 0x7a, 0xc4, 0x78, 0x56, 0x34, 0x12, 0xc4, 0x7a, 0x42, 0xa5, + 0x0f, 0x44, 0xe5, 0x53, 0xe9, 0xa5, 0x2a, 0x42, 0x66, 0xed, 0x2d, 0x11, 0x78, 0x02, + 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, 0x0c, 0xcd, 0x5b, 0x07, 0x00, 0x00, 0x00, 0x00, + 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + ]; + + assert_eq!(Hex(&ser), Hex(&expected)); + + let deser = SafeKeeperStateV2::des(&ser).unwrap(); + + assert_eq!(state, deser); + } + + #[test] + fn roundtrip_v3() { + let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); + let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); + let state = SafeKeeperStateV3 { + acceptor_state: AcceptorState { + term: 42, + term_history: TermHistory(vec![TermLsn { + lsn: Lsn(0x1), + term: 41, + }]), + }, + server: ServerInfoV3 { + pg_version: 14, + system_id: 0x1234567887654321, + tenant_id, + timeline_id, + wal_seg_size: 0x12345678, + }, + proposer_uuid: { + let mut arr = timeline_id.as_arr(); + arr.reverse(); + arr + }, + commit_lsn: Lsn(1234567800), + truncate_lsn: Lsn(123456780), + wal_start_lsn: Lsn(1234567800 - 8), + }; + + let ser = state.ser().unwrap(); + let expected = [ + 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, 0x00, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, + 0x34, 0x12, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x66, 0x30, 0x34, + 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, + 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36, + 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, + 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, + 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34, 0x78, 0x56, + 0x34, 0x12, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x34, 0x37, 0x61, + 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, + 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31, + 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, 0x0c, 0xcd, 0x5b, 0x07, 0x00, 0x00, + 0x00, 0x00, 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + ]; + + assert_eq!(Hex(&ser), Hex(&expected)); + + let deser = SafeKeeperStateV3::des(&ser).unwrap(); + + assert_eq!(state, deser); + } + + #[test] + fn roundtrip_v4() { + let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); + let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); + let state = SafeKeeperStateV4 { + tenant_id, + timeline_id, + acceptor_state: AcceptorState { + term: 42, + term_history: TermHistory(vec![TermLsn { + lsn: Lsn(0x1), + term: 41, + }]), + }, + server: ServerInfo { + pg_version: 14, + system_id: 0x1234567887654321, + wal_seg_size: 0x12345678, + }, + proposer_uuid: { + let mut arr = timeline_id.as_arr(); + arr.reverse(); + arr + }, + peers: PersistedPeers(vec![( + NodeId(1), + PersistedPeerInfo { + backup_lsn: Lsn(1234567000), + term: 42, + flush_lsn: Lsn(1234567800 - 8), + commit_lsn: Lsn(1234567600), + }, + )]), + commit_lsn: Lsn(1234567800), + s3_wal_lsn: Lsn(1234567300), + peer_horizon_lsn: Lsn(9999999), + remote_consistent_lsn: Lsn(1234560000), + }; + + let ser = state.ser().unwrap(); + let expected = [ + 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, + 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, + 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36, 0x20, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, + 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, + 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34, 0x2a, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x29, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0e, 0x00, 0x00, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12, 0x78, 0x56, + 0x34, 0x12, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x34, 0x37, 0x61, + 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, + 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31, + 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, + 0x00, 0x00, 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xe4, 0x95, 0x49, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00, + 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, + 0x00, 0x00, 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + ]; + + assert_eq!(Hex(&ser), Hex(&expected)); + + let deser = SafeKeeperStateV4::des(&ser).unwrap(); + + assert_eq!(state, deser); + } +} diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index 8cbc0aa47f8b..daf9255ecbac 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -14,7 +14,6 @@ use postgres_ffi::XLogSegNo; use serde::Deserialize; use serde::Serialize; -use serde_with::{serde_as, DisplayFromStr}; use utils::id::NodeId; use utils::id::TenantTimelineId; use utils::id::{TenantId, TimelineId}; @@ -136,12 +135,9 @@ pub struct Config { pub wal_backup_enabled: bool, } -#[serde_as] #[derive(Debug, Serialize, Deserialize)] pub struct Timeline { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, pub control_file: Option, pub memory: Option, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 474d63644108..06b903719e1f 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -4,7 +4,6 @@ use once_cell::sync::Lazy; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::SkTimelineInfo; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use std::collections::{HashMap, HashSet}; use std::fmt; use std::str::FromStr; @@ -67,11 +66,9 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { /// Same as TermLsn, but serializes LSN using display serializer /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. -#[serde_as] #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct TermSwitchApiEntry { pub term: Term, - #[serde_as(as = "DisplayFromStr")] pub lsn: Lsn, } @@ -93,28 +90,18 @@ pub struct AcceptorStateStatus { } /// Info about timeline on safekeeper ready for reporting. -#[serde_as] #[derive(Debug, Serialize, Deserialize)] pub struct TimelineStatus { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, pub acceptor_state: AcceptorStateStatus, pub pg_info: ServerInfo, - #[serde_as(as = "DisplayFromStr")] pub flush_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub timeline_start_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub local_start_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub commit_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub backup_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub peer_horizon_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, pub peers: Vec, pub walsenders: Vec, diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 2ad2ca7706d0..303bdd67fef8 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -44,8 +44,11 @@ pub struct AppendLogicalMessage { // fields from AppendRequestHeader pub term: Term, + #[serde(with = "utils::lsn::serde_as_u64")] pub epoch_start_lsn: Lsn, + #[serde(with = "utils::lsn::serde_as_u64")] pub begin_lsn: Lsn, + #[serde(with = "utils::lsn::serde_as_u64")] pub truncate_lsn: Lsn, pub pg_version: u32, } diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index e2f1b9fcff7e..ad3a18a5369b 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -6,8 +6,6 @@ use tokio::io::AsyncWriteExt; use tracing::info; use utils::id::{TenantId, TenantTimelineId, TimelineId}; -use serde_with::{serde_as, DisplayFromStr}; - use crate::{ control_file, debug_dump, http::routes::TimelineStatus, @@ -16,12 +14,9 @@ use crate::{ }; /// Info about timeline on safekeeper ready for reporting. -#[serde_as] #[derive(Debug, Serialize, Deserialize)] pub struct Request { - #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, pub http_hosts: Vec, } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 1437bdb50ec8..47a624281d43 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -52,7 +52,7 @@ impl From<(Term, Lsn)> for TermLsn { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, PartialEq)] pub struct TermHistory(pub Vec); impl TermHistory { @@ -178,7 +178,7 @@ impl fmt::Debug for TermHistory { pub type PgUuid = [u8; 16]; /// Persistent consensus state of the acceptor. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct AcceptorState { /// acceptor's last term it voted for (advanced in 1 phase) pub term: Term, @@ -209,16 +209,16 @@ pub struct ServerInfo { pub wal_seg_size: u32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedPeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. - backup_lsn: Lsn, + pub backup_lsn: Lsn, /// Term of the last entry. - term: Term, + pub term: Term, /// LSN of the last record. - flush_lsn: Lsn, + pub flush_lsn: Lsn, /// Up to which LSN safekeeper regards its WAL as committed. - commit_lsn: Lsn, + pub commit_lsn: Lsn, } impl PersistedPeerInfo { @@ -232,12 +232,12 @@ impl PersistedPeerInfo { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); /// Persistent information stored on safekeeper node /// On disk data is prefixed by magic and format version and followed by checksum. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct SafeKeeperState { #[serde(with = "hex")] pub tenant_id: TenantId, @@ -1096,7 +1096,7 @@ mod tests { use super::*; use crate::wal_storage::Storage; - use std::{ops::Deref, time::Instant}; + use std::{ops::Deref, str::FromStr, time::Instant}; // fake storage for tests struct InMemoryState { @@ -1314,4 +1314,98 @@ mod tests { }) ); } + + #[test] + fn test_sk_state_bincode_serde_roundtrip() { + use utils::Hex; + let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); + let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); + let state = SafeKeeperState { + tenant_id, + timeline_id, + acceptor_state: AcceptorState { + term: 42, + term_history: TermHistory(vec![TermLsn { + lsn: Lsn(0x1), + term: 41, + }]), + }, + server: ServerInfo { + pg_version: 14, + system_id: 0x1234567887654321, + wal_seg_size: 0x12345678, + }, + proposer_uuid: { + let mut arr = timeline_id.as_arr(); + arr.reverse(); + arr + }, + timeline_start_lsn: Lsn(0x12345600), + local_start_lsn: Lsn(0x12), + commit_lsn: Lsn(1234567800), + backup_lsn: Lsn(1234567300), + peer_horizon_lsn: Lsn(9999999), + remote_consistent_lsn: Lsn(1234560000), + peers: PersistedPeers(vec![( + NodeId(1), + PersistedPeerInfo { + backup_lsn: Lsn(1234567000), + term: 42, + flush_lsn: Lsn(1234567800 - 8), + commit_lsn: Lsn(1234567600), + }, + )]), + }; + + let ser = state.ser().unwrap(); + + #[rustfmt::skip] + let expected = [ + // tenant_id as length prefixed hex + 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36, + // timeline_id as length prefixed hex + 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34, + // term + 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // length prefix + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // unsure why this order is swapped + 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // pg_version + 0x0e, 0x00, 0x00, 0x00, + // systemid + 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12, + // wal_seg_size + 0x78, 0x56, 0x34, 0x12, + // pguuid as length prefixed hex + 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31, + + // timeline_start_lsn + 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00, + 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00, + // length prefix for persistentpeers + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // nodeid + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // backuplsn + 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00, + 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, + ]; + + assert_eq!(Hex(&ser), Hex(&expected)); + + let deser = SafeKeeperState::des(&ser).unwrap(); + + assert_eq!(deser, state); + } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index cd765dcbce23..44f14f8c7e84 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -16,7 +16,6 @@ use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use tokio::io::{AsyncRead, AsyncWrite}; use utils::id::TenantTimelineId; use utils::lsn::AtomicLsn; @@ -313,10 +312,8 @@ impl WalSendersShared { } // Serialized is used only for pretty printing in json. -#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalSenderState { - #[serde_as(as = "DisplayFromStr")] ttid: TenantTimelineId, addr: SocketAddr, conn_id: ConnectionId, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 96d844fa7afb..2ba871207ec8 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -5,10 +5,8 @@ use anyhow::{anyhow, bail, Result}; use camino::Utf8PathBuf; use postgres_ffi::XLogSegNo; use serde::{Deserialize, Serialize}; -use serde_with::serde_as; use tokio::fs; -use serde_with::DisplayFromStr; use std::cmp::max; use std::sync::Arc; use std::time::Duration; @@ -42,7 +40,6 @@ use crate::SafeKeeperConf; use crate::{debug_dump, wal_storage}; /// Things safekeeper should know about timeline state on peers. -#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerInfo { pub sk_id: NodeId, @@ -50,13 +47,10 @@ pub struct PeerInfo { /// Term of the last entry. pub last_log_term: Term, /// LSN of the last record. - #[serde_as(as = "DisplayFromStr")] pub flush_lsn: Lsn, - #[serde_as(as = "DisplayFromStr")] pub commit_lsn: Lsn, /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new /// sk since backup_lsn. - #[serde_as(as = "DisplayFromStr")] pub local_start_lsn: Lsn, /// When info was received. Serde annotations are not very useful but make /// the code compile -- we don't rely on this field externally.