diff --git a/Cargo.lock b/Cargo.lock index 9439996..999610f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,15 @@ dependencies = [ "syn 2.0.91", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.70.1" @@ -1599,6 +1608,7 @@ name = "vchord" version = "0.0.0" dependencies = [ "base", + "bincode", "half 2.4.1", "log", "nalgebra", diff --git a/Cargo.toml b/Cargo.toml index 2697ad1..92fd5bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ nalgebra = "=0.33.0" # lock rkyv version forever so that data is always compatible rkyv = { version = "=0.7.45", features = ["validation"] } +bincode = "1.3.3" half = { version = "2.4.1", features = ["rkyv"] } log = "0.4.22" paste = "1" diff --git a/src/postgres.rs b/src/postgres.rs index 06b402d..1db7e37 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,4 +1,5 @@ use std::mem::{offset_of, MaybeUninit}; +use std::ops::{Deref, DerefMut}; use std::ptr::NonNull; const _: () = assert!( @@ -46,6 +47,13 @@ impl Page { assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize); this } + pub fn clone_into_boxed(&self) -> Box { + let mut result = Box::new_uninit(); + unsafe { + std::ptr::copy(self as *const Self, result.as_mut_ptr(), 1); + result.assume_init() + } + } pub fn get_opaque(&self) -> &Opaque { &self.opaque } @@ -170,15 +178,20 @@ pub struct BufferReadGuard { } impl BufferReadGuard { - pub fn get(&self) -> &Page { - unsafe { self.page.as_ref() } - } #[allow(dead_code)] pub fn id(&self) -> u32 { self.id } } +impl Deref for BufferReadGuard { + type Target = Page; + + fn deref(&self) -> &Page { + unsafe { self.page.as_ref() } + } +} + impl Drop for BufferReadGuard { fn drop(&mut self) { unsafe { @@ -197,15 +210,23 @@ pub struct BufferWriteGuard { } impl BufferWriteGuard { - pub fn get(&self) -> &Page { + pub fn id(&self) -> u32 { + self.id + } +} + +impl Deref for BufferWriteGuard { + type Target = Page; + + fn deref(&self) -> &Page { unsafe { self.page.as_ref() } } - pub fn get_mut(&mut self) -> &mut Page { +} + +impl DerefMut for BufferWriteGuard { + fn deref_mut(&mut self) -> &mut Page { unsafe { self.page.as_mut() } } - pub fn id(&self) -> u32 { - self.id - } } impl Drop for BufferWriteGuard { @@ -215,11 +236,7 @@ impl Drop for BufferWriteGuard { pgrx::pg_sys::GenericXLogAbort(self.state); } else { if self.tracking_freespace { - pgrx::pg_sys::RecordPageWithFreeSpace( - self.raw, - self.id, - self.get().freespace() as _, - ); + pgrx::pg_sys::RecordPageWithFreeSpace(self.raw, self.id, self.freespace() as _); pgrx::pg_sys::FreeSpaceMapVacuumRange(self.raw, self.id, self.id + 1); } pgrx::pg_sys::GenericXLogFinish(self.state); @@ -329,13 +346,9 @@ impl Relation { return None; } let write = self.write(id, true); - if write.get().freespace() < freespace as _ { + if write.freespace() < freespace as _ { // the free space is recorded incorrectly - pgrx::pg_sys::RecordPageWithFreeSpace( - self.raw, - id, - write.get().freespace() as _, - ); + pgrx::pg_sys::RecordPageWithFreeSpace(self.raw, id, write.freespace() as _); pgrx::pg_sys::FreeSpaceMapVacuumRange(self.raw, id, id + 1); continue; } @@ -343,4 +356,12 @@ impl Relation { } } } + pub fn len(&self) -> u32 { + unsafe { + pgrx::pg_sys::RelationGetNumberOfBlocksInFork( + self.raw, + pgrx::pg_sys::ForkNumber::MAIN_FORKNUM, + ) + } + } } diff --git a/src/vchordrq/algorithm/build.rs b/src/vchordrq/algorithm/build.rs index 06f835e..6554bfc 100644 --- a/src/vchordrq/algorithm/build.rs +++ b/src/vchordrq/algorithm/build.rs @@ -1,7 +1,7 @@ -use crate::postgres::BufferWriteGuard; -use crate::postgres::Relation; +use super::RelationWrite; use crate::vchordrq::algorithm::rabitq; use crate::vchordrq::algorithm::tuples::*; +use crate::vchordrq::algorithm::PageGuard; use crate::vchordrq::index::am_options::Opfamily; use crate::vchordrq::types::VchordrqBuildOptions; use crate::vchordrq::types::VchordrqExternalBuildOptions; @@ -32,7 +32,7 @@ pub fn build, R: Reporter>( vector_options: VectorOptions, vchordrq_options: VchordrqIndexingOptions, heap_relation: T, - relation: Relation, + relation: impl RelationWrite, mut reporter: R, ) { let dims = vector_options.dims; @@ -75,7 +75,7 @@ pub fn build, R: Reporter>( }; let mut meta = Tape::create(&relation, false); assert_eq!(meta.first(), 0); - let mut vectors = Tape::>::create(&relation, true); + let mut vectors = Tape::, _>::create(&relation, true); let mut pointer_of_means = Vec::>::new(); for i in 0..structures.len() { let mut level = Vec::new(); @@ -99,10 +99,10 @@ pub fn build, R: Reporter>( let mut level = Vec::new(); for j in 0..structures[i].len() { if i == 0 { - let tape = Tape::::create(&relation, false); + let tape = Tape::::create(&relation, false); level.push(tape.first()); } else { - let mut tape = Tape::::create(&relation, false); + let mut tape = Tape::::create(&relation, false); let h2_mean = &structures[i].means[j]; let h2_children = &structures[i].children[j]; for child in h2_children.iter().copied() { @@ -361,18 +361,18 @@ impl Structure { } } -struct Tape<'a, T> { - relation: &'a Relation, - head: BufferWriteGuard, +struct Tape<'a: 'b, 'b, T, R: 'b + RelationWrite> { + relation: &'a R, + head: R::WriteGuard<'b>, first: u32, tracking_freespace: bool, _phantom: PhantomData T>, } -impl<'a, T> Tape<'a, T> { - fn create(relation: &'a Relation, tracking_freespace: bool) -> Self { +impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R> { + fn create(relation: &'a R, tracking_freespace: bool) -> Self { let mut head = relation.extend(tracking_freespace); - head.get_mut().get_opaque_mut().skip = head.id(); + head.get_opaque_mut().skip = head.id(); let first = head.id(); Self { relation, @@ -387,19 +387,19 @@ impl<'a, T> Tape<'a, T> { } } -impl Tape<'_, T> +impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R> where T: rkyv::Serialize>, { fn push(&mut self, x: &T) -> (u32, u16) { let bytes = rkyv::to_bytes(x).expect("failed to serialize"); - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { let next = self.relation.extend(self.tracking_freespace); - self.head.get_mut().get_opaque_mut().next = next.id(); + self.head.get_opaque_mut().next = next.id(); self.head = next; - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { panic!("tuple is too large to fit in a fresh page") diff --git a/src/vchordrq/algorithm/insert.rs b/src/vchordrq/algorithm/insert.rs index ee47e9a..323fa24 100644 --- a/src/vchordrq/algorithm/insert.rs +++ b/src/vchordrq/algorithm/insert.rs @@ -1,7 +1,8 @@ -use crate::postgres::Relation; +use super::RelationWrite; use crate::vchordrq::algorithm::rabitq::fscan_process_lowerbound; use crate::vchordrq::algorithm::tuples::*; use crate::vchordrq::algorithm::vectors; +use crate::vchordrq::algorithm::PageGuard; use base::always_equal::AlwaysEqual; use base::distance::Distance; use base::distance::DistanceKind; @@ -11,7 +12,7 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; pub fn insert( - relation: Relation, + relation: impl RelationWrite + Clone, payload: Pointer, vector: V, distance_kind: DistanceKind, @@ -20,7 +21,6 @@ pub fn insert( let vector = vector.as_borrowed(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -84,9 +84,8 @@ pub fn insert( let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -110,7 +109,7 @@ pub fn insert( AlwaysEqual(h1_tuple.first), )); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -155,11 +154,18 @@ pub fn insert( t: code.t(), }) .unwrap(); - append(relation, list.0, &tuple, false, in_building, in_building); + append( + relation.clone(), + list.0, + &tuple, + false, + in_building, + in_building, + ); } fn append( - relation: Relation, + relation: impl RelationWrite, first: u32, tuple: &[u8], tracking_freespace: bool, @@ -168,7 +174,7 @@ fn append( ) -> (u32, u16) { if tracking_freespace { if let Some(mut write) = relation.search(tuple.len()) { - let i = write.get_mut().alloc(tuple).unwrap(); + let i = write.alloc(tuple).unwrap(); return (write.id(), i); } } @@ -176,24 +182,22 @@ fn append( let mut current = first; loop { let read = relation.read(current); - if read.get().freespace() as usize >= tuple.len() - || read.get().get_opaque().next == u32::MAX - { + if read.freespace() as usize >= tuple.len() || read.get_opaque().next == u32::MAX { drop(read); let mut write = relation.write(current, tracking_freespace); - if let Some(i) = write.get_mut().alloc(tuple) { + if let Some(i) = write.alloc(tuple) { return (current, i); } - if write.get().get_opaque().next == u32::MAX { + if write.get_opaque().next == u32::MAX { let mut extend = relation.extend(tracking_freespace); - write.get_mut().get_opaque_mut().next = extend.id(); + write.get_opaque_mut().next = extend.id(); drop(write); - if let Some(i) = extend.get_mut().alloc(tuple) { + if let Some(i) = extend.alloc(tuple) { let result = (extend.id(), i); drop(extend); if updating_skip { let mut past = relation.write(first, tracking_freespace); - let skip = &mut past.get_mut().get_opaque_mut().skip; + let skip = &mut past.get_opaque_mut().skip; assert!(*skip != u32::MAX); *skip = std::cmp::max(*skip, result.0); } @@ -202,16 +206,16 @@ fn append( panic!("a tuple cannot even be fit in a fresh page"); } } - if skipping_traversal && current == first && write.get().get_opaque().skip != first { - current = write.get().get_opaque().skip; + if skipping_traversal && current == first && write.get_opaque().skip != first { + current = write.get_opaque().skip; } else { - current = write.get().get_opaque().next; + current = write.get_opaque().next; } } else { - if skipping_traversal && current == first && read.get().get_opaque().skip != first { - current = read.get().get_opaque().skip; + if skipping_traversal && current == first && read.get_opaque().skip != first { + current = read.get_opaque().skip; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } } diff --git a/src/vchordrq/algorithm/mod.rs b/src/vchordrq/algorithm/mod.rs index 88239a8..41744d7 100644 --- a/src/vchordrq/algorithm/mod.rs +++ b/src/vchordrq/algorithm/mod.rs @@ -6,3 +6,62 @@ pub mod scan; pub mod tuples; pub mod vacuum; pub mod vectors; + +use crate::postgres::Page; +use std::ops::{Deref, DerefMut}; + +pub trait PageGuard { + fn id(&self) -> u32; +} + +pub trait RelationRead { + type ReadGuard<'a>: PageGuard + Deref + where + Self: 'a; + fn read(&self, id: u32) -> Self::ReadGuard<'_>; +} + +pub trait RelationWrite: RelationRead { + type WriteGuard<'a>: PageGuard + DerefMut + where + Self: 'a; + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_>; + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_>; + fn search(&self, freespace: usize) -> Option>; +} + +impl PageGuard for crate::postgres::BufferReadGuard { + fn id(&self) -> u32 { + self.id() + } +} + +impl PageGuard for crate::postgres::BufferWriteGuard { + fn id(&self) -> u32 { + self.id() + } +} + +impl RelationRead for crate::postgres::Relation { + type ReadGuard<'a> = crate::postgres::BufferReadGuard; + + fn read(&self, id: u32) -> Self::ReadGuard<'_> { + self.read(id) + } +} + +impl RelationWrite for crate::postgres::Relation { + type WriteGuard<'a> = crate::postgres::BufferWriteGuard; + + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.write(id, tracking_freespace) + } + + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.extend(tracking_freespace) + } + + fn search(&self, freespace: usize) -> Option> { + self.search(freespace) + } +} diff --git a/src/vchordrq/algorithm/prewarm.rs b/src/vchordrq/algorithm/prewarm.rs index 26bb986..6d01f7c 100644 --- a/src/vchordrq/algorithm/prewarm.rs +++ b/src/vchordrq/algorithm/prewarm.rs @@ -1,13 +1,12 @@ -use crate::postgres::Relation; +use super::RelationRead; use crate::vchordrq::algorithm::tuples::*; use crate::vchordrq::algorithm::vectors; use std::fmt::Write; -pub fn prewarm(relation: Relation, height: i32) -> String { +pub fn prewarm(relation: impl RelationRead + Clone, height: i32) -> String { let mut message = String::new(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -37,9 +36,8 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -47,7 +45,7 @@ pub fn prewarm(relation: Relation, height: i32) -> String { vectors::vector_warm::(relation.clone(), h1_tuple.mean); results.push(h1_tuple.first); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); @@ -66,16 +64,15 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let _h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") .expect("data corruption"); results.push(()); } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); diff --git a/src/vchordrq/algorithm/scan.rs b/src/vchordrq/algorithm/scan.rs index df4d93d..42357c0 100644 --- a/src/vchordrq/algorithm/scan.rs +++ b/src/vchordrq/algorithm/scan.rs @@ -1,4 +1,4 @@ -use crate::postgres::Relation; +use super::RelationRead; use crate::vchordrq::algorithm::rabitq::fscan_process_lowerbound; use crate::vchordrq::algorithm::tuples::*; use crate::vchordrq::algorithm::vectors; @@ -11,7 +11,7 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; pub fn scan( - relation: Relation, + relation: impl RelationRead + Clone, vector: V, distance_kind: DistanceKind, probes: Vec, @@ -20,7 +20,6 @@ pub fn scan( let vector = vector.as_borrowed(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -66,9 +65,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -92,7 +90,7 @@ pub fn scan( AlwaysEqual(h1_tuple.first), )); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -121,6 +119,7 @@ pub fn scan( for i in (1..meta_tuple.height_of_root).rev() { lists = make_lists(lists, probes[i as usize - 1]); } + drop(meta_guard); { let mut results = Vec::new(); for list in lists { @@ -138,9 +137,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -164,7 +162,7 @@ pub fn scan( AlwaysEqual(h0_tuple.payload), )); } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); diff --git a/src/vchordrq/algorithm/vacuum.rs b/src/vchordrq/algorithm/vacuum.rs index 2b219c4..c77bd21 100644 --- a/src/vchordrq/algorithm/vacuum.rs +++ b/src/vchordrq/algorithm/vacuum.rs @@ -1,13 +1,16 @@ -use crate::postgres::Relation; +use super::RelationWrite; use crate::vchordrq::algorithm::tuples::*; use base::search::Pointer; -pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) -> bool) { +pub fn vacuum( + relation: impl RelationWrite, + delay: impl Fn(), + callback: impl Fn(Pointer) -> bool, +) { // step 1: vacuum height_0_tuple { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -19,16 +22,15 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn let mut current = first; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") .expect("data corruption"); results.push(h1_tuple.first); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } results @@ -42,9 +44,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn delay(); let mut h0_guard = relation.write(current, false); let mut reconstruct_removes = Vec::new(); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -53,8 +54,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn reconstruct_removes.push(i); } } - h0_guard.get_mut().reconstruct(&reconstruct_removes); - current = h0_guard.get().get_opaque().next; + h0_guard.reconstruct(&reconstruct_removes); + current = h0_guard.get_opaque().next; } } } @@ -63,7 +64,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn let mut current = { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -74,8 +74,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn delay(); let read = relation.read(current); let flag = 'flag: { - for i in 1..=read.get().len() { - let Some(vector_tuple) = read.get().get(i) else { + for i in 1..=read.len() { + let Some(vector_tuple) = read.get(i) else { continue; }; let vector_tuple = @@ -91,21 +91,21 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn if flag { drop(read); let mut write = relation.write(current, true); - for i in 1..=write.get().len() { - let Some(vector_tuple) = write.get().get(i) else { + for i in 1..=write.len() { + let Some(vector_tuple) = write.get(i) else { continue; }; let vector_tuple = unsafe { rkyv::archived_root::>(vector_tuple) }; if let Some(payload) = vector_tuple.payload.as_ref().copied() { if callback(Pointer::new(payload)) { - write.get_mut().free(i); + write.free(i); } } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } } diff --git a/src/vchordrq/algorithm/vectors.rs b/src/vchordrq/algorithm/vectors.rs index 6a23f74..06075d3 100644 --- a/src/vchordrq/algorithm/vectors.rs +++ b/src/vchordrq/algorithm/vectors.rs @@ -1,11 +1,11 @@ use super::tuples::Vector; -use crate::postgres::Relation; +use super::RelationRead; use crate::vchordrq::algorithm::tuples::VectorTuple; use base::distance::Distance; use base::distance::DistanceKind; pub fn vector_dist( - relation: Relation, + relation: impl RelationRead, vector: V::Borrowed<'_>, mean: (u32, u16), payload: Option, @@ -25,7 +25,7 @@ pub fn vector_dist( return None; }; let vector_guard = relation.read(mean.0); - let Some(vector_tuple) = vector_guard.get().get(mean.1) else { + let Some(vector_tuple) = vector_guard.get(mean.1) else { // fails consistency check return None; }; @@ -54,11 +54,11 @@ pub fn vector_dist( )) } -pub fn vector_warm(relation: Relation, mean: (u32, u16)) { +pub fn vector_warm(relation: impl RelationRead, mean: (u32, u16)) { let mut cursor = Ok(mean); while let Ok(mean) = cursor { let vector_guard = relation.read(mean.0); - let Some(vector_tuple) = vector_guard.get().get(mean.1) else { + let Some(vector_tuple) = vector_guard.get(mean.1) else { // fails consistency check return; }; diff --git a/src/vchordrq/index/am.rs b/src/vchordrq/index/am.rs index 8f22db6..9a4e06b 100644 --- a/src/vchordrq/index/am.rs +++ b/src/vchordrq/index/am.rs @@ -1,7 +1,8 @@ -use crate::postgres::Relation; -use crate::vchordrq::algorithm; +use crate::postgres::{Page, Relation}; use crate::vchordrq::algorithm::build::{HeapRelation, Reporter}; use crate::vchordrq::algorithm::tuples::Vector; +use crate::vchordrq::algorithm::{self, PageGuard}; +use crate::vchordrq::algorithm::{RelationRead, RelationWrite}; use crate::vchordrq::index::am_options::{Opfamily, Reloption}; use crate::vchordrq::index::am_scan::Scanner; use crate::vchordrq::index::utils::{ctid_to_pointer, pointer_to_ctid}; @@ -12,6 +13,8 @@ use base::vector::VectOwned; use half::f16; use pgrx::datum::Internal; use pgrx::pg_sys::Datum; +use std::collections::HashMap; +use std::ops::Deref; static mut RELOPT_KIND_VCHORDRQ: pgrx::pg_sys::relopt_kind::Type = 0; @@ -259,7 +262,52 @@ pub unsafe extern "C" fn ambuild( reporter.clone(), ), } - if let Some(leader) = unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent) } + let cache = { + let n = index_relation.len(); + let mut dir = HashMap::::with_capacity(n as _); + let mut pages = Vec::>::new(); + { + use crate::vchordrq::algorithm::tuples::{Height1Tuple, MetaTuple}; + let mut read = |id| { + let result = index_relation.read(id); + dir.insert(id, pages.len()); + pages.push(result.clone_into_boxed()); + result + }; + let meta_guard = read(0); + let meta_tuple = meta_guard + .get(1) + .map(rkyv::check_archived_root::) + .expect("data corruption") + .expect("data corruption"); + let mut firsts = vec![meta_tuple.first]; + let mut make_firsts = |firsts| { + let mut results = Vec::new(); + for first in firsts { + let mut current = first; + while current != u32::MAX { + let h1_guard = read(current); + for i in 1..=h1_guard.len() { + let h1_tuple = h1_guard + .get(i) + .map(rkyv::check_archived_root::) + .expect("data corruption") + .expect("data corruption"); + results.push(h1_tuple.first); + } + current = h1_guard.get_opaque().next; + } + } + results + }; + for _ in (1..meta_tuple.height_of_root).rev() { + firsts = make_firsts(firsts); + } + } + (dir, pages) + }; + if let Some(leader) = + unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent, &cache) } { unsafe { parallel_build( @@ -269,6 +317,8 @@ pub unsafe extern "C" fn ambuild( leader.tablescandesc, leader.vchordrqshared, Some(reporter), + &*leader.cache_0, + &*leader.cache_1, ); leader.wait(); let nparticipants = leader.nparticipants; @@ -289,6 +339,11 @@ pub unsafe extern "C" fn ambuild( } else { let mut indtuples = 0; reporter.tuples_done(indtuples); + let relation = unsafe { Relation::new(index) }; + let relation = CachingRelation { + cache: &(cache.0, cache.1.iter().map(|x| x.as_ref()).collect()), + relation, + }; match opfamily.vector_kind() { VectorKind::Vecf32 => { HeapRelation::>::traverse( @@ -296,7 +351,7 @@ pub unsafe extern "C" fn ambuild( true, |(pointer, vector)| { algorithm::insert::insert::>( - unsafe { Relation::new(index) }, + relation.clone(), pointer, vector, opfamily.distance_kind(), @@ -313,7 +368,7 @@ pub unsafe extern "C" fn ambuild( true, |(pointer, vector)| { algorithm::insert::insert::>( - unsafe { Relation::new(index) }, + relation.clone(), pointer, vector, opfamily.distance_kind(), @@ -329,11 +384,78 @@ pub unsafe extern "C" fn ambuild( unsafe { pgrx::pgbox::PgBox::::alloc0().into_pg() } } +#[derive(Clone)] +struct CachingRelation<'a, R> { + cache: &'a (HashMap, Vec<&'a Page>), + relation: R, +} + +enum CachingRelationReadGuard<'a, G> { + Wrapping(G), + Cached(u32, &'a Page), +} + +impl PageGuard for CachingRelationReadGuard<'_, G> { + fn id(&self) -> u32 { + match self { + CachingRelationReadGuard::Wrapping(x) => x.id(), + CachingRelationReadGuard::Cached(id, _) => *id, + } + } +} + +impl> Deref for CachingRelationReadGuard<'_, G> { + type Target = Page; + + fn deref(&self) -> &Self::Target { + match self { + CachingRelationReadGuard::Wrapping(x) => x, + CachingRelationReadGuard::Cached(_, page) => page, + } + } +} + +impl RelationRead for CachingRelation<'_, R> { + type ReadGuard<'a> + = CachingRelationReadGuard<'a, R::ReadGuard<'a>> + where + Self: 'a; + + fn read(&self, id: u32) -> Self::ReadGuard<'_> { + if let Some(&x) = self.cache.0.get(&id) { + CachingRelationReadGuard::Cached(id, self.cache.1[x]) + } else { + CachingRelationReadGuard::Wrapping(self.relation.read(id)) + } + } +} + +impl RelationWrite for CachingRelation<'_, R> { + type WriteGuard<'a> + = R::WriteGuard<'a> + where + Self: 'a; + + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.relation.write(id, tracking_freespace) + } + + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.relation.extend(tracking_freespace) + } + + fn search(&self, freespace: usize) -> Option> { + self.relation.search(freespace) + } +} + struct VchordrqShared { /* Immutable state */ heaprelid: pgrx::pg_sys::Oid, indexrelid: pgrx::pg_sys::Oid, isconcurrent: bool, + est_cache_0: usize, + est_cache_1: usize, /* Worker progress */ workersdonecv: pgrx::pg_sys::ConditionVariable, @@ -359,6 +481,8 @@ struct VchordrqLeader { nparticipants: i32, vchordrqshared: *mut VchordrqShared, tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData, + cache_0: *const [u8], + cache_1: *const [u8], snapshot: pgrx::pg_sys::Snapshot, } @@ -367,7 +491,10 @@ impl VchordrqLeader { heap: pgrx::pg_sys::Relation, index: pgrx::pg_sys::Relation, isconcurrent: bool, + cache: &(HashMap, Vec>), ) -> Option { + let cache_mapping: Vec = bincode::serialize(&cache.0).unwrap(); + unsafe fn compute_parallel_workers( heap: pgrx::pg_sys::Relation, index: pgrx::pg_sys::Relation, @@ -418,11 +545,17 @@ impl VchordrqLeader { } let est_tablescandesc = unsafe { pgrx::pg_sys::table_parallelscan_estimate(heap, snapshot) }; + let est_cache_0 = cache_mapping.len(); + let est_cache_1 = cache.1.len() * size_of::(); unsafe { estimate_chunk(&mut (*pcxt).estimator, size_of::()); estimate_keys(&mut (*pcxt).estimator, 1); estimate_chunk(&mut (*pcxt).estimator, est_tablescandesc); estimate_keys(&mut (*pcxt).estimator, 1); + estimate_chunk(&mut (*pcxt).estimator, est_cache_0); + estimate_keys(&mut (*pcxt).estimator, 1); + estimate_chunk(&mut (*pcxt).estimator, est_cache_1); + estimate_keys(&mut (*pcxt).estimator, 1); } unsafe { @@ -449,6 +582,8 @@ impl VchordrqLeader { mutex: std::mem::zeroed(), nparticipantsdone: 0, indtuples: 0, + est_cache_0, + est_cache_1, }); pgrx::pg_sys::ConditionVariableInit(&raw mut (*vchordrqshared).workersdonecv); pgrx::pg_sys::SpinLockInit(&raw mut (*vchordrqshared).mutex); @@ -462,9 +597,29 @@ impl VchordrqLeader { tablescandesc }; + let cache_0 = unsafe { + let cache_0 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_0).cast::(); + std::ptr::copy(cache_mapping.as_ptr(), cache_0, est_cache_0); + core::ptr::slice_from_raw_parts(cache_0, est_cache_0) + }; + + let cache_1 = unsafe { + let cache_1 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_1).cast::(); + for i in 0..cache.1.len() { + std::ptr::copy( + (cache.1[i].deref() as *const Page).cast::(), + cache_1.cast::().add(i).cast(), + size_of::(), + ); + } + core::ptr::slice_from_raw_parts(cache_1, est_cache_1) + }; + unsafe { pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000001, vchordrqshared.cast()); pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000002, tablescandesc.cast()); + pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000003, cache_0 as _); + pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000004, cache_1 as _); } unsafe { @@ -490,6 +645,8 @@ impl VchordrqLeader { nparticipants: nworkers_launched + 1, vchordrqshared, tablescandesc, + cache_0, + cache_1, snapshot, }) } @@ -529,6 +686,18 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main( pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000002, false) .cast::() }; + let cache_0 = unsafe { + std::slice::from_raw_parts( + pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000003, false).cast::(), + (*vchordrqshared).est_cache_0, + ) + }; + let cache_1 = unsafe { + std::slice::from_raw_parts( + pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000004, false).cast::(), + (*vchordrqshared).est_cache_1, + ) + }; let heap_lockmode; let index_lockmode; if unsafe { !(*vchordrqshared).isconcurrent } { @@ -546,7 +715,16 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main( } unsafe { - parallel_build(index, heap, index_info, tablescandesc, vchordrqshared, None); + parallel_build( + index, + heap, + index_info, + tablescandesc, + vchordrqshared, + None, + cache_0, + cache_1, + ); } unsafe { @@ -562,6 +740,8 @@ unsafe fn parallel_build( tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData, vchordrqshared: *mut VchordrqShared, mut reporter: Option, + cache_0: &[u8], + cache_1: &[u8], ) { #[derive(Debug, Clone)] pub struct Heap { @@ -627,6 +807,21 @@ unsafe fn parallel_build( } let index_relation = unsafe { Relation::new(index) }; + let index_relation = CachingRelation { + cache: { + let cache_0: HashMap = bincode::deserialize(cache_0).unwrap(); + assert!(cache_1.len() % size_of::() == 0); + let n = cache_1.len() / size_of::(); + let cache_1 = unsafe { + (0..n) + .map(|i| &*cache_1.as_ptr().cast::().add(i)) + .collect::>() + }; + &(cache_0, cache_1) + }, + relation: index_relation, + }; + let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap, tablescandesc) }; let opfamily = unsafe { am_options::opfamily(index) }; let heap_relation = Heap { diff --git a/src/vchordrqfscan/algorithm/build.rs b/src/vchordrqfscan/algorithm/build.rs index 25a6611..9a0daa6 100644 --- a/src/vchordrqfscan/algorithm/build.rs +++ b/src/vchordrqfscan/algorithm/build.rs @@ -167,7 +167,7 @@ pub fn build( } pointer_of_firsts.push(level); } - forwards.head.get_mut().get_opaque_mut().skip = vectors.first(); + forwards.head.get_opaque_mut().skip = vectors.first(); meta.push(&MetaTuple { dims, height_of_root: structures.len() as u32, @@ -399,13 +399,13 @@ where { fn push(&mut self, x: &T) -> (u32, u16) { let bytes = rkyv::to_bytes(x).expect("failed to serialize"); - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { let next = self.relation.extend(self.tracking_freespace); - self.head.get_mut().get_opaque_mut().next = next.id(); + self.head.get_opaque_mut().next = next.id(); self.head = next; - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { panic!("tuple is too large to fit in a fresh page") diff --git a/src/vchordrqfscan/algorithm/insert.rs b/src/vchordrqfscan/algorithm/insert.rs index 4dfd432..1a0ab77 100644 --- a/src/vchordrqfscan/algorithm/insert.rs +++ b/src/vchordrqfscan/algorithm/insert.rs @@ -14,7 +14,6 @@ use std::collections::BinaryHeap; pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_kind: DistanceKind) { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -35,22 +34,18 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k }) .unwrap(); if let Some(mut write) = relation.search(tuple.len()) { - let i = write.get_mut().alloc(&tuple).unwrap(); + let i = write.alloc(&tuple).unwrap(); break 'h0_vector (write.id(), i); } - let mut current = relation - .read(meta_tuple.forwards_first) - .get() - .get_opaque() - .skip; + let mut current = relation.read(meta_tuple.forwards_first).get_opaque().skip; let mut changed = false; loop { let read = relation.read(current); let flag = 'flag: { - if read.get().freespace() as usize >= tuple.len() { + if read.freespace() as usize >= tuple.len() { break 'flag true; } - if read.get().get_opaque().next == u32::MAX { + if read.get_opaque().next == u32::MAX { break 'flag true; } false @@ -58,28 +53,27 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k if flag { drop(read); let mut write = relation.write(current, true); - if let Some(i) = write.get_mut().alloc(&tuple) { + if let Some(i) = write.alloc(&tuple) { break (current, i); } - if write.get().get_opaque().next == u32::MAX { + if write.get_opaque().next == u32::MAX { if changed { relation .write(meta_tuple.forwards_first, false) - .get_mut() .get_opaque_mut() .skip = write.id(); } let mut extend = relation.extend(true); - write.get_mut().get_opaque_mut().next = extend.id(); - if let Some(i) = extend.get_mut().alloc(&tuple) { + write.get_opaque_mut().next = extend.id(); + if let Some(i) = extend.alloc(&tuple) { break (extend.id(), i); } else { panic!("a tuple cannot even be fit in a fresh page"); } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } changed = true; } @@ -90,7 +84,6 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k if is_residual { let vector_guard = relation.read(meta_tuple.mean.0); let vector_tuple = vector_guard - .get() .get(meta_tuple.mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -111,9 +104,8 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -141,7 +133,7 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -151,7 +143,6 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); let vector_guard = relation.read(mean.0); let vector_tuple = vector_guard - .get() .get(mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -196,9 +187,8 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k loop { let read = relation.read(current); let flag = 'flag: { - for i in 1..=read.get().len() { + for i in 1..=read.len() { let h0_tuple = read - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -207,10 +197,10 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k break 'flag true; } } - if read.get().freespace() as usize >= dummy.len() { + if read.freespace() as usize >= dummy.len() { break 'flag true; } - if read.get().get_opaque().next == u32::MAX { + if read.get_opaque().next == u32::MAX { break 'flag true; } false @@ -218,9 +208,9 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k if flag { drop(read); let mut write = relation.write(current, false); - for i in 1..=write.get().len() { + for i in 1..=write.len() { let flag = put( - write.get_mut().get_mut(i).expect("data corruption"), + write.get_mut(i).expect("data corruption"), dims, &code, h0_vector, @@ -230,9 +220,9 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k return; } } - if let Some(i) = write.get_mut().alloc(&dummy) { + if let Some(i) = write.alloc(&dummy) { let flag = put( - write.get_mut().get_mut(i).expect("data corruption"), + write.get_mut(i).expect("data corruption"), dims, &code, h0_vector, @@ -241,12 +231,12 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k assert!(flag, "a put fails even on a fresh tuple"); return; } - if write.get().get_opaque().next == u32::MAX { + if write.get_opaque().next == u32::MAX { let mut extend = relation.extend(false); - write.get_mut().get_opaque_mut().next = extend.id(); - if let Some(i) = extend.get_mut().alloc(&dummy) { + write.get_opaque_mut().next = extend.id(); + if let Some(i) = extend.alloc(&dummy) { let flag = put( - extend.get_mut().get_mut(i).expect("data corruption"), + extend.get_mut(i).expect("data corruption"), dims, &code, h0_vector, @@ -258,9 +248,9 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k panic!("a tuple cannot even be fit in a fresh page"); } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } } diff --git a/src/vchordrqfscan/algorithm/prewarm.rs b/src/vchordrqfscan/algorithm/prewarm.rs index ec7642a..ec8976a 100644 --- a/src/vchordrqfscan/algorithm/prewarm.rs +++ b/src/vchordrqfscan/algorithm/prewarm.rs @@ -6,7 +6,6 @@ pub fn prewarm(relation: Relation, height: i32) -> String { let mut message = String::new(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -22,7 +21,6 @@ pub fn prewarm(relation: Relation, height: i32) -> String { { let vector_guard = relation.read(meta_tuple.mean.0); let vector_tuple = vector_guard - .get() .get(meta_tuple.mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -43,9 +41,8 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -56,7 +53,6 @@ pub fn prewarm(relation: Relation, height: i32) -> String { let mean = h1_tuple.mean[j]; let vector_guard = relation.read(mean.0); let vector_tuple = vector_guard - .get() .get(mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -65,7 +61,7 @@ pub fn prewarm(relation: Relation, height: i32) -> String { } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); @@ -84,9 +80,8 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -97,7 +92,7 @@ pub fn prewarm(relation: Relation, height: i32) -> String { } } } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); diff --git a/src/vchordrqfscan/algorithm/scan.rs b/src/vchordrqfscan/algorithm/scan.rs index 202949e..a691b6c 100644 --- a/src/vchordrqfscan/algorithm/scan.rs +++ b/src/vchordrqfscan/algorithm/scan.rs @@ -20,7 +20,6 @@ pub fn scan( ) -> impl Iterator { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -41,7 +40,6 @@ pub fn scan( if is_residual { let vector_guard = relation.read(meta_tuple.mean.0); let vector_tuple = vector_guard - .get() .get(meta_tuple.mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -62,9 +60,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -92,7 +89,7 @@ pub fn scan( } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -102,7 +99,6 @@ pub fn scan( let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); let vector_guard = relation.read(mean.0); let vector_tuple = vector_guard - .get() .get(mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -138,9 +134,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -168,7 +163,7 @@ pub fn scan( } } } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -177,7 +172,7 @@ pub fn scan( while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(pay_u)) = heap.pop().unwrap(); let vector_guard = relation.read(mean.0); - let Some(vector_tuple) = vector_guard.get().get(mean.1) else { + let Some(vector_tuple) = vector_guard.get(mean.1) else { // fails consistency check continue; }; diff --git a/src/vchordrqfscan/algorithm/vacuum.rs b/src/vchordrqfscan/algorithm/vacuum.rs index 7bb5179..8ede4f6 100644 --- a/src/vchordrqfscan/algorithm/vacuum.rs +++ b/src/vchordrqfscan/algorithm/vacuum.rs @@ -8,7 +8,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -20,9 +19,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - let mut current = first; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -33,7 +31,7 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } results @@ -46,9 +44,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - while current != u32::MAX { delay(); let mut h0_guard = relation.write(current, false); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -64,7 +61,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - if flag { // todo: use mutable API let mut temp = h0_guard - .get() .get(i) .map(rkyv::from_bytes::) .expect("data corruption") @@ -76,14 +72,13 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - } let temp = rkyv::to_bytes::<_, 8192>(&temp).expect("failed to serialize"); h0_guard - .get_mut() .get_mut(i) .expect("data corruption") .copy_from_slice(&temp); } } // todo: cross-tuple vacuum so that we can skip a tuple - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } } @@ -92,7 +87,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - let mut current = { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -103,8 +97,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - delay(); let read = relation.read(current); let flag = 'flag: { - for i in 1..=read.get().len() { - let Some(vector_tuple) = read.get().get(i) else { + for i in 1..=read.len() { + let Some(vector_tuple) = read.get(i) else { continue; }; let vector_tuple = rkyv::check_archived_root::(vector_tuple) @@ -120,21 +114,21 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - if flag { drop(read); let mut write = relation.write(current, true); - for i in 1..=write.get().len() { - let Some(vector_tuple) = write.get().get(i) else { + for i in 1..=write.len() { + let Some(vector_tuple) = write.get(i) else { continue; }; let vector_tuple = rkyv::check_archived_root::(vector_tuple) .expect("data corruption"); if let Some(payload) = vector_tuple.payload.as_ref().copied() { if callback(Pointer::new(payload)) { - write.get_mut().free(i); + write.free(i); } } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } }