Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safekeeper: lift benchmarking utils into safekeeper crate #10200

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions libs/postgres_ffi/src/wal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
const TIMELINE_ID: u32 = 1;

/// Creates a new WAL generator with the given record generator.
pub fn new(record_generator: R) -> WalGenerator<R> {
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
Self {
record_generator,
lsn: Lsn(0),
prev_lsn: Lsn(0),
lsn: start_lsn,
prev_lsn: start_lsn,
}
}

Expand Down
2 changes: 2 additions & 0 deletions safekeeper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
benchmarking = []

[dependencies]
async-stream.workspace = true
Expand Down Expand Up @@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
[[bench]]
name = "receive_wal"
harness = false
required-features = ["benchmarking"]
23 changes: 12 additions & 11 deletions safekeeper/benches/receive_wal.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
//! WAL ingestion benchmarks.

#[path = "benchutils.rs"]
mod benchutils;

use std::io::Write as _;

use benchutils::Env;
use bytes::BytesMut;
use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
Expand All @@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
};
use safekeeper::test_utils::Env;
use tokio::io::AsyncWriteExt as _;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
Expand Down Expand Up @@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];

let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));

// Set up the Safekeeper.
let env = Env::new(fsync)?;
let mut safekeeper =
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
let mut safekeeper = runtime.block_on(env.make_safekeeper(
NodeId(1),
TenantTimelineId::generate(),
Lsn(0),
))?;

b.iter_batched_ref(
// Pre-construct WAL records and requests. Criterion will batch them.
Expand Down Expand Up @@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded

let env = Env::new(fsync)?;
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
let walgen =
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));

// Create buffered channels that can fit all requests, to avoid blocking on channels.
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
Expand All @@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
// TODO: WalAcceptor doesn't actually need a full timeline, only
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;
Expand Down Expand Up @@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];

let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));

// Construct and spawn the WalAcceptor task.
let env = Env::new(fsync)?;
Expand All @@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {

runtime.block_on(async {
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;
Expand Down
3 changes: 3 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;

#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;

mod timelines_global_map;
use std::sync::Arc;
pub use timelines_global_map::GlobalTimelines;
Expand Down
28 changes: 15 additions & 13 deletions safekeeper/benches/benchutils.rs → safekeeper/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::sync::Arc;

use crate::rate_limit::RateLimiter;
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path;
use crate::{control_file, wal_storage, SafeKeeperConf};
use camino_tempfile::Utf8TempDir;
use safekeeper::rate_limit::RateLimiter;
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use safekeeper::state::{TimelinePersistentState, TimelineState};
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use safekeeper::timelines_set::TimelinesSet;
use safekeeper::wal_backup::remote_timeline_path;
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;

/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
pub struct Env {
/// Whether to enable fsync.
pub fsync: bool,
Expand All @@ -21,7 +21,7 @@ pub struct Env {
}

impl Env {
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
/// enable fsyncing.
pub fn new(fsync: bool) -> anyhow::Result<Self> {
let tempdir = camino_tempfile::tempdir()?;
Expand All @@ -47,6 +47,7 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
let conf = self.make_conf(node_id);

Expand All @@ -67,9 +68,9 @@ impl Env {
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
term: 1,
start_streaming_at: Lsn(0),
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
timeline_start_lsn: Lsn(0),
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;

Expand All @@ -82,12 +83,13 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;

let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));

let timeline = Timeline::new(
Expand Down
2 changes: 1 addition & 1 deletion safekeeper/tests/walproposer_sim/walproposer_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl DiskWalProposer {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
}),
})
}
Expand Down
Loading