Skip to content

Commit

Permalink
wal_decoder: prepare interpretation for fan-out
Browse files Browse the repository at this point in the history
Currently, we call `InterpretedWalRecord::from_bytes_filtered`
from each shard. To serve multiple shards at the same time,
the API needs to allow for enquiring about multiple shards.

This commit tweaks it a pretty brute force way. Naively, we could
just generate the shard for a key, but pre and post split shards
may be subscribed at the same time, so doing it efficiently is more
complex.
  • Loading branch information
VladLazar committed Dec 19, 2024
1 parent a3b29b1 commit cab7a14
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 158 deletions.
6 changes: 3 additions & 3 deletions libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use ::utils::shard::*;

/// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
/// and to check whether that [`ShardNumber`] is the same as the current shard.
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardIdentity {
pub number: ShardNumber,
pub count: ShardCount,
Expand All @@ -49,7 +49,7 @@ pub struct ShardIdentity {
}

/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardStripeSize(pub u32);

impl Default for ShardStripeSize {
Expand All @@ -59,7 +59,7 @@ impl Default for ShardStripeSize {
}

/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);

const LAYOUT_V1: ShardLayout = ShardLayout(1);
Expand Down
113 changes: 65 additions & 48 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use utils::lsn::Lsn;

impl InterpretedWalRecord {
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
/// Data blocks which do not match the provided shard identity are filtered out.
/// Data blocks which do not match any of the provided shard identities are filtered out.
/// Shard 0 is a special case since it tracks all relation sizes. We only give it
/// the keys that are being written as that is enough for updating relation sizes.
pub fn from_bytes_filtered(
buf: Bytes,
shard: &ShardIdentity,
shards: &Vec<ShardIdentity>,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<InterpretedWalRecord> {
) -> anyhow::Result<Vec<(ShardIdentity, InterpretedWalRecord)>> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
let xid = decoded.xl_xid;
Expand All @@ -33,36 +33,43 @@ impl InterpretedWalRecord {
FlushUncommittedRecords::No
};

let metadata_record =
MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
let batch = SerializedValueBatch::from_decoded_filtered(
let metadata_per_shard =
MetadataRecord::from_decoded_filtered(&decoded, shards, next_record_lsn, pg_version)?;
let mut batches_per_shard = SerializedValueBatch::from_decoded_filtered(
decoded,
shard,
shards,
next_record_lsn,
pg_version,
)?;

Ok(InterpretedWalRecord {
metadata_record,
batch,
next_record_lsn,
flush_uncommitted,
xid,
})
let mut record_per_shard = Vec::with_capacity(shards.len());
for (shard_id, metadata_record) in metadata_per_shard {
let record = InterpretedWalRecord {
metadata_record,
batch: batches_per_shard.remove(&shard_id).unwrap(),
next_record_lsn,
flush_uncommitted,
xid,
};

record_per_shard.push((shard_id, record));
}

Ok(record_per_shard)
}
}

impl MetadataRecord {
/// Builds a metadata record for this WAL record, if any.
/// Builds metadata records for this WAL record for the specified shards if any, if any.
///
/// Only metadata records relevant for the given shard are emitted. Currently, most metadata
/// Only metadata records relevant for the given shards is emitted. Currently, most metadata
/// records are broadcast to all shards for simplicity, but this should be improved.
fn from_decoded_filtered(
decoded: &DecodedWALRecord,
shard: &ShardIdentity,
shards: &Vec<ShardIdentity>,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<Option<MetadataRecord>> {
) -> anyhow::Result<Vec<(ShardIdentity, Option<MetadataRecord>)>> {
// Note: this doesn't actually copy the bytes since
// the [`Bytes`] type implements it via a level of indirection.
let mut buf = decoded.record.clone();
Expand Down Expand Up @@ -112,41 +119,51 @@ impl MetadataRecord {
};

// Next, filter the metadata record by shard.
match metadata_record {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
let mut metadata_records_per_shard = Vec::with_capacity(shards.len());
for shard in shards {
let mut metadata_for_shard = metadata_record.clone();

match metadata_for_shard {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none()
&& clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
}
}
}
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
}
}
_ => {}
}
_ => {}

metadata_records_per_shard.push((*shard, metadata_for_shard));
}

Ok(metadata_record)
assert_eq!(metadata_records_per_shard.len(), shards.len());

Ok(metadata_records_per_shard)
}

fn decode_heapam_record(
Expand Down
2 changes: 1 addition & 1 deletion libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub mod proto {
tonic::include_proto!("interpreted_wal");
}

#[derive(Serialize, Deserialize)]
#[derive(Copy, Clone, Serialize, Deserialize)]
pub enum FlushUncommittedRecords {
Yes,
No,
Expand Down
Loading

0 comments on commit cab7a14

Please sign in to comment.