From 45444f27caf6c28dba1601e77fd908026bfc9ee4 Mon Sep 17 00:00:00 2001 From: usamoi Date: Tue, 24 Dec 2024 17:04:39 +0800 Subject: [PATCH] [experimental] feat: pinning index in memory when building Signed-off-by: usamoi --- Cargo.lock | 10 ++ Cargo.toml | 1 + src/postgres.rs | 2 - src/vchordrq/index/am.rs | 201 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 208 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9439996..999610f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,15 @@ dependencies = [ "syn 2.0.91", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.70.1" @@ -1599,6 +1608,7 @@ name = "vchord" version = "0.0.0" dependencies = [ "base", + "bincode", "half 2.4.1", "log", "nalgebra", diff --git a/Cargo.toml b/Cargo.toml index 2697ad1..92fd5bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ nalgebra = "=0.33.0" # lock rkyv version forever so that data is always compatible rkyv = { version = "=0.7.45", features = ["validation"] } +bincode = "1.3.3" half = { version = "2.4.1", features = ["rkyv"] } log = "0.4.22" paste = "1" diff --git a/src/postgres.rs b/src/postgres.rs index c24d669..1db7e37 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -47,7 +47,6 @@ impl Page { assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize); this } - #[allow(dead_code)] pub fn clone_into_boxed(&self) -> Box { let mut result = Box::new_uninit(); unsafe { @@ -357,7 +356,6 @@ impl Relation { } } } - #[allow(dead_code)] pub fn len(&self) -> u32 { unsafe { pgrx::pg_sys::RelationGetNumberOfBlocksInFork( diff --git a/src/vchordrq/index/am.rs b/src/vchordrq/index/am.rs index 1dc7cdc..9a4e06b 100644 --- a/src/vchordrq/index/am.rs +++ b/src/vchordrq/index/am.rs @@ -1,7 +1,8 @@ -use crate::postgres::Relation; -use crate::vchordrq::algorithm; +use crate::postgres::{Page, Relation}; use crate::vchordrq::algorithm::build::{HeapRelation, Reporter}; use crate::vchordrq::algorithm::tuples::Vector; +use crate::vchordrq::algorithm::{self, PageGuard}; +use crate::vchordrq::algorithm::{RelationRead, RelationWrite}; use crate::vchordrq::index::am_options::{Opfamily, Reloption}; use crate::vchordrq::index::am_scan::Scanner; use crate::vchordrq::index::utils::{ctid_to_pointer, pointer_to_ctid}; @@ -12,6 +13,8 @@ use base::vector::VectOwned; use half::f16; use pgrx::datum::Internal; use pgrx::pg_sys::Datum; +use std::collections::HashMap; +use std::ops::Deref; static mut RELOPT_KIND_VCHORDRQ: pgrx::pg_sys::relopt_kind::Type = 0; @@ -259,7 +262,52 @@ pub unsafe extern "C" fn ambuild( reporter.clone(), ), } - if let Some(leader) = unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent) } + let cache = { + let n = index_relation.len(); + let mut dir = HashMap::::with_capacity(n as _); + let mut pages = Vec::>::new(); + { + use crate::vchordrq::algorithm::tuples::{Height1Tuple, MetaTuple}; + let mut read = |id| { + let result = index_relation.read(id); + dir.insert(id, pages.len()); + pages.push(result.clone_into_boxed()); + result + }; + let meta_guard = read(0); + let meta_tuple = meta_guard + .get(1) + .map(rkyv::check_archived_root::) + .expect("data corruption") + .expect("data corruption"); + let mut firsts = vec![meta_tuple.first]; + let mut make_firsts = |firsts| { + let mut results = Vec::new(); + for first in firsts { + let mut current = first; + while current != u32::MAX { + let h1_guard = read(current); + for i in 1..=h1_guard.len() { + let h1_tuple = h1_guard + .get(i) + .map(rkyv::check_archived_root::) + .expect("data corruption") + .expect("data corruption"); + results.push(h1_tuple.first); + } + current = h1_guard.get_opaque().next; + } + } + results + }; + for _ in (1..meta_tuple.height_of_root).rev() { + firsts = make_firsts(firsts); + } + } + (dir, pages) + }; + if let Some(leader) = + unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent, &cache) } { unsafe { parallel_build( @@ -269,6 +317,8 @@ pub unsafe extern "C" fn ambuild( leader.tablescandesc, leader.vchordrqshared, Some(reporter), + &*leader.cache_0, + &*leader.cache_1, ); leader.wait(); let nparticipants = leader.nparticipants; @@ -290,6 +340,10 @@ pub unsafe extern "C" fn ambuild( let mut indtuples = 0; reporter.tuples_done(indtuples); let relation = unsafe { Relation::new(index) }; + let relation = CachingRelation { + cache: &(cache.0, cache.1.iter().map(|x| x.as_ref()).collect()), + relation, + }; match opfamily.vector_kind() { VectorKind::Vecf32 => { HeapRelation::>::traverse( @@ -330,11 +384,78 @@ pub unsafe extern "C" fn ambuild( unsafe { pgrx::pgbox::PgBox::::alloc0().into_pg() } } +#[derive(Clone)] +struct CachingRelation<'a, R> { + cache: &'a (HashMap, Vec<&'a Page>), + relation: R, +} + +enum CachingRelationReadGuard<'a, G> { + Wrapping(G), + Cached(u32, &'a Page), +} + +impl PageGuard for CachingRelationReadGuard<'_, G> { + fn id(&self) -> u32 { + match self { + CachingRelationReadGuard::Wrapping(x) => x.id(), + CachingRelationReadGuard::Cached(id, _) => *id, + } + } +} + +impl> Deref for CachingRelationReadGuard<'_, G> { + type Target = Page; + + fn deref(&self) -> &Self::Target { + match self { + CachingRelationReadGuard::Wrapping(x) => x, + CachingRelationReadGuard::Cached(_, page) => page, + } + } +} + +impl RelationRead for CachingRelation<'_, R> { + type ReadGuard<'a> + = CachingRelationReadGuard<'a, R::ReadGuard<'a>> + where + Self: 'a; + + fn read(&self, id: u32) -> Self::ReadGuard<'_> { + if let Some(&x) = self.cache.0.get(&id) { + CachingRelationReadGuard::Cached(id, self.cache.1[x]) + } else { + CachingRelationReadGuard::Wrapping(self.relation.read(id)) + } + } +} + +impl RelationWrite for CachingRelation<'_, R> { + type WriteGuard<'a> + = R::WriteGuard<'a> + where + Self: 'a; + + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.relation.write(id, tracking_freespace) + } + + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.relation.extend(tracking_freespace) + } + + fn search(&self, freespace: usize) -> Option> { + self.relation.search(freespace) + } +} + struct VchordrqShared { /* Immutable state */ heaprelid: pgrx::pg_sys::Oid, indexrelid: pgrx::pg_sys::Oid, isconcurrent: bool, + est_cache_0: usize, + est_cache_1: usize, /* Worker progress */ workersdonecv: pgrx::pg_sys::ConditionVariable, @@ -360,6 +481,8 @@ struct VchordrqLeader { nparticipants: i32, vchordrqshared: *mut VchordrqShared, tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData, + cache_0: *const [u8], + cache_1: *const [u8], snapshot: pgrx::pg_sys::Snapshot, } @@ -368,7 +491,10 @@ impl VchordrqLeader { heap: pgrx::pg_sys::Relation, index: pgrx::pg_sys::Relation, isconcurrent: bool, + cache: &(HashMap, Vec>), ) -> Option { + let cache_mapping: Vec = bincode::serialize(&cache.0).unwrap(); + unsafe fn compute_parallel_workers( heap: pgrx::pg_sys::Relation, index: pgrx::pg_sys::Relation, @@ -419,11 +545,17 @@ impl VchordrqLeader { } let est_tablescandesc = unsafe { pgrx::pg_sys::table_parallelscan_estimate(heap, snapshot) }; + let est_cache_0 = cache_mapping.len(); + let est_cache_1 = cache.1.len() * size_of::(); unsafe { estimate_chunk(&mut (*pcxt).estimator, size_of::()); estimate_keys(&mut (*pcxt).estimator, 1); estimate_chunk(&mut (*pcxt).estimator, est_tablescandesc); estimate_keys(&mut (*pcxt).estimator, 1); + estimate_chunk(&mut (*pcxt).estimator, est_cache_0); + estimate_keys(&mut (*pcxt).estimator, 1); + estimate_chunk(&mut (*pcxt).estimator, est_cache_1); + estimate_keys(&mut (*pcxt).estimator, 1); } unsafe { @@ -450,6 +582,8 @@ impl VchordrqLeader { mutex: std::mem::zeroed(), nparticipantsdone: 0, indtuples: 0, + est_cache_0, + est_cache_1, }); pgrx::pg_sys::ConditionVariableInit(&raw mut (*vchordrqshared).workersdonecv); pgrx::pg_sys::SpinLockInit(&raw mut (*vchordrqshared).mutex); @@ -463,9 +597,29 @@ impl VchordrqLeader { tablescandesc }; + let cache_0 = unsafe { + let cache_0 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_0).cast::(); + std::ptr::copy(cache_mapping.as_ptr(), cache_0, est_cache_0); + core::ptr::slice_from_raw_parts(cache_0, est_cache_0) + }; + + let cache_1 = unsafe { + let cache_1 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_1).cast::(); + for i in 0..cache.1.len() { + std::ptr::copy( + (cache.1[i].deref() as *const Page).cast::(), + cache_1.cast::().add(i).cast(), + size_of::(), + ); + } + core::ptr::slice_from_raw_parts(cache_1, est_cache_1) + }; + unsafe { pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000001, vchordrqshared.cast()); pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000002, tablescandesc.cast()); + pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000003, cache_0 as _); + pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000004, cache_1 as _); } unsafe { @@ -491,6 +645,8 @@ impl VchordrqLeader { nparticipants: nworkers_launched + 1, vchordrqshared, tablescandesc, + cache_0, + cache_1, snapshot, }) } @@ -530,6 +686,18 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main( pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000002, false) .cast::() }; + let cache_0 = unsafe { + std::slice::from_raw_parts( + pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000003, false).cast::(), + (*vchordrqshared).est_cache_0, + ) + }; + let cache_1 = unsafe { + std::slice::from_raw_parts( + pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000004, false).cast::(), + (*vchordrqshared).est_cache_1, + ) + }; let heap_lockmode; let index_lockmode; if unsafe { !(*vchordrqshared).isconcurrent } { @@ -547,7 +715,16 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main( } unsafe { - parallel_build(index, heap, index_info, tablescandesc, vchordrqshared, None); + parallel_build( + index, + heap, + index_info, + tablescandesc, + vchordrqshared, + None, + cache_0, + cache_1, + ); } unsafe { @@ -563,6 +740,8 @@ unsafe fn parallel_build( tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData, vchordrqshared: *mut VchordrqShared, mut reporter: Option, + cache_0: &[u8], + cache_1: &[u8], ) { #[derive(Debug, Clone)] pub struct Heap { @@ -628,6 +807,20 @@ unsafe fn parallel_build( } let index_relation = unsafe { Relation::new(index) }; + let index_relation = CachingRelation { + cache: { + let cache_0: HashMap = bincode::deserialize(cache_0).unwrap(); + assert!(cache_1.len() % size_of::() == 0); + let n = cache_1.len() / size_of::(); + let cache_1 = unsafe { + (0..n) + .map(|i| &*cache_1.as_ptr().cast::().add(i)) + .collect::>() + }; + &(cache_0, cache_1) + }, + relation: index_relation, + }; let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap, tablescandesc) }; let opfamily = unsafe { am_options::opfamily(index) };