diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index a5c94a82c162..cf0cd3a46b88 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -158,7 +158,8 @@ impl ShardIdentity { key_to_shard_number(self.count, self.stripe_size, key) } - /// Return true if the key should be ingested by this shard + /// Return true if the key is stored only on this shard. This does not include + /// global keys, see is_key_global(). /// /// Shards must ingest _at least_ keys which return true from this check. pub fn is_key_local(&self, key: &Key) -> bool { @@ -171,7 +172,7 @@ impl ShardIdentity { } /// Return true if the key should be stored on all shards, not just one. - fn is_key_global(&self, key: &Key) -> bool { + pub fn is_key_global(&self, key: &Key) -> bool { if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() { // Special keys that are only stored on shard 0 false diff --git a/libs/utils/src/shard.rs b/libs/utils/src/shard.rs index 782cddc599b0..6352ea9f9253 100644 --- a/libs/utils/src/shard.rs +++ b/libs/utils/src/shard.rs @@ -164,6 +164,12 @@ impl TenantShardId { } } +impl std::fmt::Display for ShardNumber { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + impl std::fmt::Display for ShardSlug<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index bf3d7a74a35d..0657d1af3a84 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -53,7 +53,7 @@ use utils::{ postgres_client::PostgresClientProtocol, sync::gate::{Gate, GateGuard}, }; -use wal_decoder::serialized_batch::SerializedValueBatch; +use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -5924,6 +5924,23 @@ impl<'a> TimelineWriter<'a> { return Ok(()); } + // In debug builds, assert that we don't write any keys that don't belong to this shard. + // We don't assert this in release builds, since key ownership policies may change over + // time. Stray keys will be removed during compaction. + if cfg!(debug_assertions) { + for metadata in &batch.metadata { + if let ValueMeta::Serialized(metadata) = metadata { + let key = Key::from_compact(metadata.key); + assert!( + self.shard_identity.is_key_local(&key) + || self.shard_identity.is_key_global(&key), + "key {key} does not belong on shard {}", + self.shard_identity.shard_index() + ); + } + } + } + let batch_max_lsn = batch.max_lsn; let buf_size: u64 = batch.buffer_size() as u64; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8ececa2bfb46..7f86ede0436c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1179,11 +1179,12 @@ impl Timeline { .await .map_err(CompactionError::Other)?; } else { - debug!( - "Dropping key {} during compaction (it belongs on shard {:?})", - key, - self.shard_identity.get_shard_number(&key) - ); + let shard = self.shard_identity.shard_index(); + let owner = self.shard_identity.get_shard_number(&key); + if cfg!(debug_assertions) { + panic!("key {key} does not belong on shard {shard}, owned by {owner}"); + } + debug!("dropping key {key} during compaction (it belongs on shard {owner})"); } if !new_layers.is_empty() { @@ -2054,6 +2055,11 @@ impl Timeline { // This is not handled in the filter iterator because shard is determined by hash. // Therefore, it does not give us any performance benefit to do things like skip // a whole layer file as handling key spaces (ranges). + if cfg!(debug_assertions) { + let shard = self.shard_identity.shard_index(); + let owner = self.shard_identity.get_shard_number(&key); + panic!("key {key} does not belong on shard {shard}, owned by {owner}"); + } continue; } if !job_desc.compaction_key_range.contains(&key) {