Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modify geyser account iter at snapshot load #960

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,40 +89,65 @@ impl AccountsDb {
) {
let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();

let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
let mut accounts_duplicate: HashMap<Pubkey, usize> = HashMap::default();
let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
let accounts = storage_entry.accounts.account_iter();
let mut account_len = 0;
accounts.for_each(|account| {
let mut pubkeys = HashSet::new();

// populate `accounts_duplicate` for any pubkeys that are in this storage twice.
// Storages cannot return `StoredAccountMeta<'_>` for more than 1 account at a time, so we have to do 2 passes to make sure
// we don't have duplicate pubkeys.
let mut i = 0;
storage_entry.accounts.scan_pubkeys(|pubkey| {
i += 1; // pre-increment to most easily match early returns in next loop
if !pubkeys.insert(*pubkey) {
accounts_duplicate.insert(*pubkey, i); // remember the highest index entry in this slot
}
});

// now, actually notify geyser
let mut i = 0;
storage_entry.accounts.scan_accounts(|account| {
i += 1;
account_len += 1;
if notified_accounts.contains(account.pubkey()) {
notify_stats.skipped_accounts += 1;
return;
}
if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
if highest_i != &i {
// this pubkey is in this storage twice and the current instance is not the last one, so we skip it.
// We only send unique accounts in this slot to `notify_filtered_accounts`
return;
}
Comment on lines +117 to +122

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do move where account_len is incremented? Or maybe before we call return we also decrement? Otherwise I think we'll over count the number of accounts if there are duplicates.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wondered this, too. in the previous impl, we incremented account_len for each account in accounts.for_each(|account| {. This would have included duplicates already. The impl in this pr should be exactly the same behavior as the previous impl.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Yeah, that's fair. Won't be an issue anywhere until we have ancient append vecs. And if we make pack the default too, then by then we'll never hit this issue (and could remove this duplicates handling entirely).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even still, ancient append vecs should in practice never have duplicates. but it is possible I think.

}

// later entries in the same slot are more recent and override earlier accounts for the same pubkey
// We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version.
// As long as all accounts for this slot are in 1 append vec that can be itereated olest to newest.
accounts_to_stream.insert(*account.pubkey(), account);
self.notify_filtered_accounts(
slot,
notified_accounts,
std::iter::once(account),
notify_stats,
);
});
notify_stats.total_accounts += account_len;
measure_filter.stop();
notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;

self.notify_filtered_accounts(slot, notified_accounts, accounts_to_stream, notify_stats);
}

fn notify_filtered_accounts(
fn notify_filtered_accounts<'a>(
&self,
slot: Slot,
notified_accounts: &mut HashSet<Pubkey>,
mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta>,
accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
) {
let notifier = self.accounts_update_notifier.as_ref().unwrap();
let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
let local_write_version = 0;
for (_, mut account) in accounts_to_stream.drain() {
for mut account in accounts_to_stream {
// We do not need to rely on the specific write_version read from the append vec.
// So, overwrite the write_version with something that works.
// 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey.
Expand All @@ -143,8 +168,9 @@ impl AccountsDb {
notified_accounts.insert(*account.pubkey());
measure_bookkeep.stop();
notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;

notify_stats.notified_accounts += 1;
}
notify_stats.notified_accounts += accounts_to_stream.len();
measure_notify.stop();
notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
}
Expand Down
Loading