Skip to content

Commit

Permalink
Extract public sk types to safekeeper_api (#10137)
Browse files Browse the repository at this point in the history
## Problem

We want to extract safekeeper http client to separate crate for use in
storage controller and neon_local. However, many types used in the API
are internal to safekeeper.

## Summary of changes

Move them to safekeeper_api crate. No functional changes.

ref #9011
  • Loading branch information
arssher authored Dec 13, 2024
1 parent 7dc3826 commit ce8eb08
Show file tree
Hide file tree
Showing 24 changed files with 264 additions and 245 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion libs/safekeeper_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ edition.workspace = true
license.workspace = true

[dependencies]
serde.workspace = true
const_format.workspace = true
serde.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
tokio.workspace = true
utils.workspace = true
17 changes: 17 additions & 0 deletions libs/safekeeper_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
use pq_proto::SystemId;
use serde::{Deserialize, Serialize};

/// Public API types
pub mod models;

/// Consensus logical timestamp. Note: it is a part of sk control file.
pub type Term = u64;
pub const INVALID_TERM: Term = 0;

/// Information about Postgres. Safekeeper gets it once and then verifies all
/// further connections from computes match. Note: it is a part of sk control
/// file.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}

pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

Expand Down
170 changes: 169 additions & 1 deletion libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
//! Types used in safekeeper http API. Many of them are also reused internally.
use postgres_ffi::TimestampTz;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use tokio::time::Instant;

use utils::{
id::{NodeId, TenantId, TimelineId},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
pageserver_feedback::PageserverFeedback,
};

use crate::{ServerInfo, Term};

#[derive(Debug, Serialize)]
pub struct SafekeeperStatus {
pub id: NodeId,
}

#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
Expand All @@ -18,6 +31,161 @@ pub struct TimelineCreateRequest {
pub local_start_lsn: Option<Lsn>,
}

/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
pub lsn: Lsn,
}

/// Augment AcceptorState with last_log_term for convenience
#[derive(Debug, Serialize, Deserialize)]
pub struct AcceptorStateStatus {
pub term: Term,
pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility
pub term_history: Vec<TermSwitchApiEntry>,
}

/// Things safekeeper should know about timeline state on peers.
/// Used as both model and internally.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
pub term: Term,
/// Term of the last entry.
pub last_log_term: Term,
/// LSN of the last record.
pub flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL.
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
#[serde(skip)]
#[serde(default = "Instant::now")]
pub ts: Instant,
pub pg_connstr: String,
pub http_connstr: String,
}

pub type FullTransactionId = u64;

/// Hot standby feedback received from replica
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,
pub catalog_xmin: FullTransactionId,
}

pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;

impl HotStandbyFeedback {
pub fn empty() -> HotStandbyFeedback {
HotStandbyFeedback {
ts: 0,
xmin: 0,
catalog_xmin: 0,
}
}
}

/// Standby status update
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
pub reply_requested: bool,
}

impl StandbyReply {
pub fn empty() -> Self {
StandbyReply {
write_lsn: Lsn::INVALID,
flush_lsn: Lsn::INVALID,
apply_lsn: Lsn::INVALID,
reply_ts: 0,
reply_requested: false,
}
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}

impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}

/// Receiver is either pageserver or regular standby, which have different
/// feedbacks.
/// Used as both model and internally.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ReplicationFeedback {
Pageserver(PageserverFeedback),
Standby(StandbyFeedback),
}

/// Uniquely identifies a WAL service connection. Logged in spans for
/// observability.
pub type ConnectionId = u32;

/// Serialize is used only for json'ing in API response. Also used internally.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalSenderState {
pub ttid: TenantTimelineId,
pub addr: SocketAddr,
pub conn_id: ConnectionId,
// postgres application_name
pub appname: Option<String>,
pub feedback: ReplicationFeedback,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
pub conn_id: Option<ConnectionId>,
pub status: WalReceiverStatus,
}

/// Walreceiver status. Currently only whether it passed voting stage and
/// started receiving the stream, but it is easy to add more if needed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalReceiverStatus {
Voting,
Streaming,
}

/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineStatus {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
pub flush_lsn: Lsn,
pub timeline_start_lsn: Lsn,
pub local_start_lsn: Lsn,
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,
pub walreceivers: Vec<WalReceiverState>,
}

fn lsn_invalid() -> Lsn {
Lsn::INVALID
}
Expand Down
3 changes: 2 additions & 1 deletion safekeeper/src/control_file_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Code to deal with safekeeper control file upgrades
use crate::{
safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn},
safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn},
state::{EvictionState, PersistedPeers, TimelinePersistentState},
wal_backup_partial,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::{
Expand Down
2 changes: 1 addition & 1 deletion safekeeper/src/debug_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use postgres_ffi::MAX_SEND_SIZE;
use safekeeper_api::models::WalSenderState;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -25,7 +26,6 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;

use crate::safekeeper::TermHistory;
use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::timeline::get_timeline_dir;
Expand Down
4 changes: 2 additions & 2 deletions safekeeper/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use anyhow::Context;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use safekeeper_api::models::ConnectionId;
use safekeeper_api::Term;
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
Expand All @@ -16,9 +18,7 @@ use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};

use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE};
use crate::safekeeper::Term;
use crate::timeline::TimelineError;
use crate::wal_service::ConnectionId;
use crate::{GlobalTimelines, SafeKeeperConf};
use postgres_backend::PostgresBackend;
use postgres_backend::QueryError;
Expand Down
3 changes: 1 addition & 2 deletions safekeeper/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
//! etc.
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::TimelineStatus;
use std::error::Error as _;
use utils::{
http::error::HttpErrorBody,
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};

use super::routes::TimelineStatus;

#[derive(Debug, Clone)]
pub struct Client {
mgmt_api_endpoint: String,
Expand Down
Loading

1 comment on commit ce8eb08

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5160 tests run: 4924 passed, 1 failed, 235 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_idle_checkpoints[debug-pg17]"
Flaky tests (6)

Postgres 17

Postgres 16

  • test_physical_replication_config_mismatch_too_many_known_xids: release-arm64

Postgres 15

Postgres 14

  • test_pgdata_import_smoke[8-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64

Test coverage report is not available

The comment gets automatically updated with the latest test results
ce8eb08 at 2024-12-13T15:05:23.987Z :recycle:

Please sign in to comment.