Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safekeeper: decode and interpret for multiple shards in one go #10201

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use ::utils::shard::*;

/// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
/// and to check whether that [`ShardNumber`] is the same as the current shard.
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardIdentity {
pub number: ShardNumber,
pub count: ShardCount,
Expand All @@ -49,7 +49,7 @@ pub struct ShardIdentity {
}

/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardStripeSize(pub u32);

impl Default for ShardStripeSize {
Expand All @@ -59,7 +59,7 @@ impl Default for ShardStripeSize {
}

/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);

const LAYOUT_V1: ShardLayout = ShardLayout(1);
Expand Down
14 changes: 7 additions & 7 deletions libs/postgres_ffi/src/walrecord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;

#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
pub mid: MultiXactId,
/* new MultiXact's ID */
Expand Down Expand Up @@ -46,7 +46,7 @@ impl XlMultiXactCreate {
}

#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactTruncate {
pub oldest_multi_db: Oid,
/* to-be-truncated range of multixact offsets */
Expand All @@ -72,7 +72,7 @@ impl XlMultiXactTruncate {
}

#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlRelmapUpdate {
pub dbid: Oid, /* database ID, or 0 for shared map */
pub tsid: Oid, /* database's tablespace, or pg_global */
Expand All @@ -90,7 +90,7 @@ impl XlRelmapUpdate {
}

#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlReploriginDrop {
pub node_id: RepOriginId,
}
Expand All @@ -104,7 +104,7 @@ impl XlReploriginDrop {
}

#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlReploriginSet {
pub remote_lsn: Lsn,
pub node_id: RepOriginId,
Expand Down Expand Up @@ -911,7 +911,7 @@ impl XlSmgrCreate {
}

#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlSmgrTruncate {
pub blkno: BlockNumber,
pub rnode: RelFileNode,
Expand Down Expand Up @@ -984,7 +984,7 @@ impl XlDropDatabase {
/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
/// struct for commits and aborts.
///
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlXactParsedRecord {
pub xid: TransactionId,
pub info: u8,
Expand Down
113 changes: 65 additions & 48 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use utils::lsn::Lsn;

impl InterpretedWalRecord {
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
/// Data blocks which do not match the provided shard identity are filtered out.
/// Data blocks which do not match any of the provided shard identities are filtered out.
/// Shard 0 is a special case since it tracks all relation sizes. We only give it
/// the keys that are being written as that is enough for updating relation sizes.
pub fn from_bytes_filtered(
buf: Bytes,
shard: &ShardIdentity,
shards: &Vec<ShardIdentity>,
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<InterpretedWalRecord> {
) -> anyhow::Result<Vec<(ShardIdentity, InterpretedWalRecord)>> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
let xid = decoded.xl_xid;
Expand All @@ -33,36 +33,43 @@ impl InterpretedWalRecord {
FlushUncommittedRecords::No
};

let metadata_record =
MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
let batch = SerializedValueBatch::from_decoded_filtered(
let metadata_per_shard =
MetadataRecord::from_decoded_filtered(&decoded, shards, next_record_lsn, pg_version)?;
let mut batches_per_shard = SerializedValueBatch::from_decoded_filtered(
decoded,
shard,
shards,
next_record_lsn,
pg_version,
)?;

Ok(InterpretedWalRecord {
metadata_record,
batch,
next_record_lsn,
flush_uncommitted,
xid,
})
let mut record_per_shard = Vec::with_capacity(shards.len());
for (shard_id, metadata_record) in metadata_per_shard {
let record = InterpretedWalRecord {
metadata_record,
batch: batches_per_shard.remove(&shard_id).unwrap(),
next_record_lsn,
flush_uncommitted,
xid,
};

record_per_shard.push((shard_id, record));
}

Ok(record_per_shard)
}
}

impl MetadataRecord {
/// Builds a metadata record for this WAL record, if any.
/// Builds metadata records for this WAL record for the specified shards if any, if any.
///
/// Only metadata records relevant for the given shard are emitted. Currently, most metadata
/// Only metadata records relevant for the given shards is emitted. Currently, most metadata
/// records are broadcast to all shards for simplicity, but this should be improved.
fn from_decoded_filtered(
decoded: &DecodedWALRecord,
shard: &ShardIdentity,
shards: &Vec<ShardIdentity>,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<Option<MetadataRecord>> {
) -> anyhow::Result<Vec<(ShardIdentity, Option<MetadataRecord>)>> {
// Note: this doesn't actually copy the bytes since
// the [`Bytes`] type implements it via a level of indirection.
let mut buf = decoded.record.clone();
Expand Down Expand Up @@ -112,41 +119,51 @@ impl MetadataRecord {
};

// Next, filter the metadata record by shard.
match metadata_record {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
let mut metadata_records_per_shard = Vec::with_capacity(shards.len());
for shard in shards {
let mut metadata_for_shard = metadata_record.clone();
VladLazar marked this conversation as resolved.
Show resolved Hide resolved

match metadata_for_shard {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none()
&& clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
}
}
}
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
}
}
_ => {}
}
_ => {}

metadata_records_per_shard.push((*shard, metadata_for_shard));
}

Ok(metadata_record)
assert_eq!(metadata_records_per_shard.len(), shards.len());

Ok(metadata_records_per_shard)
}

fn decode_heapam_record(
Expand Down
Loading
Loading