-
Notifications
You must be signed in to change notification settings - Fork 456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
safekeeper: decode and interpret for multiple shards in one go #10201
base: main
Are you sure you want to change the base?
Conversation
Currently, we call `InterpretedWalRecord::from_bytes_filtered` from each shard. To serve multiple shards at the same time, the API needs to allow for enquiring about multiple shards. This commit tweaks it a pretty brute force way. Naively, we could just generate the shard for a key, but pre and post split shards may be subscribed at the same time, so doing it efficiently is more complex.
7095 tests run: 6797 passed, 0 failed, 298 skipped (full report)Flaky tests (1)Postgres 17
Code coverage* (full report)
* collected from Rust tests only The comment gets automatically updated with the latest test results
cab7a14 at 2024-12-19T13:17:14.686Z :recycle: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're adding a fair number of heap allocations here. I'll leave it to you to decide whether we need to optimize these now or leave it for later (benchmarks would be good).
/// Shard 0 is a special case since it tracks all relation sizes. We only give it | ||
/// the keys that are being written as that is enough for updating relation sizes. | ||
pub fn from_bytes_filtered( | ||
buf: Bytes, | ||
shard: &ShardIdentity, | ||
shards: &Vec<ShardIdentity>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: &[ShardIdentity]
throughout.
// This duplicates some of the work below, but it's empirically much faster. | ||
let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version); | ||
let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size); | ||
let mut shard_batches = HashMap::with_capacity(shards.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow-up, we should avoid all of these temporary allocations, as this is on the hot path. The easiest way to do that, which also allows reusing allocations across different timelines, is via an object pool -- I don't know which of the Rust implementations are any good, but there's a bunch of them.
In the meanwhile, we could avoid at least one allocation here by having the caller pass in a &mut HashMap<ShardIdentity, SerializedValueBatch>
which the function populates for the given shards. Or alternatively use something like smallvec
which stack-allocates small vectors. This goes throughout.
metadata_record = None | ||
let mut metadata_records_per_shard = Vec::with_capacity(shards.len()); | ||
for shard in shards { | ||
let mut metadata_for_shard = metadata_record.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can defer this clone until we know whether the record is relevant for this shard. This is mostly relevant for heap-allocated records -- I haven't checked if they would be frequent enough to matter.
We could also invert the logic here. If we passed in the shards as a HashSet
then we could know which shards are relevant and just map the records directly to that shard without checking all of them.
metadata, | ||
max_lsn, | ||
len, | ||
} = &mut batch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be clearer to refer to e.g. batch.raw
instead of destructuring these into local variables, as a reminder that we're writing directly to the batch here.
@@ -312,12 +314,16 @@ async fn import_wal( | |||
let mut modification = tline.begin_modification(last_lsn); | |||
while last_lsn <= endpoint { | |||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? { | |||
let interpreted = InterpretedWalRecord::from_bytes_filtered( | |||
let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be worth adding a convenience method for the single-shard case, since we end up doing this at a bunch of call sites.
Yeah, I felt quite naughty writing this stuff. I'll dust up my benchmark for this stuff to see how much we need to optimise here. |
Problem
Currently, we call
InterpretedWalRecord::from_bytes_filtered
from each shard. To serve multiple shards at the same time,
the API needs to allow for enquiring about multiple shards.
Summary of changes
This commit tweaks it a pretty brute force way. Naively, we could
just generate the shard for a key, but pre and post split shards
may be subscribed at the same time, so doing it efficiently is more
complex.