Skip to content

Commit

Permalink
chore: impl dereference traits for page guards (#156)
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi authored Dec 27, 2024
1 parent 01835f0 commit 3e23a14
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 176 deletions.
61 changes: 42 additions & 19 deletions src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::mem::{offset_of, MaybeUninit};
use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;

const _: () = assert!(
Expand Down Expand Up @@ -46,6 +47,14 @@ impl Page {
assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize);
this
}
#[allow(dead_code)]
pub fn clone_into_boxed(&self) -> Box<Self> {
let mut result = Box::new_uninit();
unsafe {
std::ptr::copy(self as *const Self, result.as_mut_ptr(), 1);
result.assume_init()
}
}
pub fn get_opaque(&self) -> &Opaque {
&self.opaque
}
Expand Down Expand Up @@ -170,15 +179,20 @@ pub struct BufferReadGuard {
}

impl BufferReadGuard {
pub fn get(&self) -> &Page {
unsafe { self.page.as_ref() }
}
#[allow(dead_code)]
pub fn id(&self) -> u32 {
self.id
}
}

impl Deref for BufferReadGuard {
type Target = Page;

fn deref(&self) -> &Page {
unsafe { self.page.as_ref() }
}
}

impl Drop for BufferReadGuard {
fn drop(&mut self) {
unsafe {
Expand All @@ -197,15 +211,23 @@ pub struct BufferWriteGuard {
}

impl BufferWriteGuard {
pub fn get(&self) -> &Page {
pub fn id(&self) -> u32 {
self.id
}
}

impl Deref for BufferWriteGuard {
type Target = Page;

fn deref(&self) -> &Page {
unsafe { self.page.as_ref() }
}
pub fn get_mut(&mut self) -> &mut Page {
}

impl DerefMut for BufferWriteGuard {
fn deref_mut(&mut self) -> &mut Page {
unsafe { self.page.as_mut() }
}
pub fn id(&self) -> u32 {
self.id
}
}

impl Drop for BufferWriteGuard {
Expand All @@ -215,11 +237,7 @@ impl Drop for BufferWriteGuard {
pgrx::pg_sys::GenericXLogAbort(self.state);
} else {
if self.tracking_freespace {
pgrx::pg_sys::RecordPageWithFreeSpace(
self.raw,
self.id,
self.get().freespace() as _,
);
pgrx::pg_sys::RecordPageWithFreeSpace(self.raw, self.id, self.freespace() as _);
pgrx::pg_sys::FreeSpaceMapVacuumRange(self.raw, self.id, self.id + 1);
}
pgrx::pg_sys::GenericXLogFinish(self.state);
Expand Down Expand Up @@ -329,18 +347,23 @@ impl Relation {
return None;
}
let write = self.write(id, true);
if write.get().freespace() < freespace as _ {
if write.freespace() < freespace as _ {
// the free space is recorded incorrectly
pgrx::pg_sys::RecordPageWithFreeSpace(
self.raw,
id,
write.get().freespace() as _,
);
pgrx::pg_sys::RecordPageWithFreeSpace(self.raw, id, write.freespace() as _);
pgrx::pg_sys::FreeSpaceMapVacuumRange(self.raw, id, id + 1);
continue;
}
return Some(write);
}
}
}
#[allow(dead_code)]
pub fn len(&self) -> u32 {
unsafe {
pgrx::pg_sys::RelationGetNumberOfBlocksInFork(
self.raw,
pgrx::pg_sys::ForkNumber::MAIN_FORKNUM,
)
}
}
}
32 changes: 16 additions & 16 deletions src/vchordrq/algorithm/build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::postgres::BufferWriteGuard;
use crate::postgres::Relation;
use super::RelationWrite;
use crate::vchordrq::algorithm::rabitq;
use crate::vchordrq::algorithm::tuples::*;
use crate::vchordrq::algorithm::PageGuard;
use crate::vchordrq::index::am_options::Opfamily;
use crate::vchordrq::types::VchordrqBuildOptions;
use crate::vchordrq::types::VchordrqExternalBuildOptions;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub fn build<V: Vector, T: HeapRelation<V>, R: Reporter>(
vector_options: VectorOptions,
vchordrq_options: VchordrqIndexingOptions,
heap_relation: T,
relation: Relation,
relation: impl RelationWrite,
mut reporter: R,
) {
let dims = vector_options.dims;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub fn build<V: Vector, T: HeapRelation<V>, R: Reporter>(
};
let mut meta = Tape::create(&relation, false);
assert_eq!(meta.first(), 0);
let mut vectors = Tape::<VectorTuple<V>>::create(&relation, true);
let mut vectors = Tape::<VectorTuple<V>, _>::create(&relation, true);
let mut pointer_of_means = Vec::<Vec<(u32, u16)>>::new();
for i in 0..structures.len() {
let mut level = Vec::new();
Expand All @@ -99,10 +99,10 @@ pub fn build<V: Vector, T: HeapRelation<V>, R: Reporter>(
let mut level = Vec::new();
for j in 0..structures[i].len() {
if i == 0 {
let tape = Tape::<Height0Tuple>::create(&relation, false);
let tape = Tape::<Height0Tuple, _>::create(&relation, false);
level.push(tape.first());
} else {
let mut tape = Tape::<Height1Tuple>::create(&relation, false);
let mut tape = Tape::<Height1Tuple, _>::create(&relation, false);
let h2_mean = &structures[i].means[j];
let h2_children = &structures[i].children[j];
for child in h2_children.iter().copied() {
Expand Down Expand Up @@ -361,18 +361,18 @@ impl Structure {
}
}

struct Tape<'a, T> {
relation: &'a Relation,
head: BufferWriteGuard,
struct Tape<'a: 'b, 'b, T, R: 'b + RelationWrite> {
relation: &'a R,
head: R::WriteGuard<'b>,
first: u32,
tracking_freespace: bool,
_phantom: PhantomData<fn(T) -> T>,
}

impl<'a, T> Tape<'a, T> {
fn create(relation: &'a Relation, tracking_freespace: bool) -> Self {
impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R> {
fn create(relation: &'a R, tracking_freespace: bool) -> Self {
let mut head = relation.extend(tracking_freespace);
head.get_mut().get_opaque_mut().skip = head.id();
head.get_opaque_mut().skip = head.id();
let first = head.id();
Self {
relation,
Expand All @@ -387,19 +387,19 @@ impl<'a, T> Tape<'a, T> {
}
}

impl<T> Tape<'_, T>
impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R>
where
T: rkyv::Serialize<AllocSerializer<8192>>,
{
fn push(&mut self, x: &T) -> (u32, u16) {
let bytes = rkyv::to_bytes(x).expect("failed to serialize");
if let Some(i) = self.head.get_mut().alloc(&bytes) {
if let Some(i) = self.head.alloc(&bytes) {
(self.head.id(), i)
} else {
let next = self.relation.extend(self.tracking_freespace);
self.head.get_mut().get_opaque_mut().next = next.id();
self.head.get_opaque_mut().next = next.id();
self.head = next;
if let Some(i) = self.head.get_mut().alloc(&bytes) {
if let Some(i) = self.head.alloc(&bytes) {
(self.head.id(), i)
} else {
panic!("tuple is too large to fit in a fresh page")
Expand Down
50 changes: 27 additions & 23 deletions src/vchordrq/algorithm/insert.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::postgres::Relation;
use super::RelationWrite;
use crate::vchordrq::algorithm::rabitq::fscan_process_lowerbound;
use crate::vchordrq::algorithm::tuples::*;
use crate::vchordrq::algorithm::vectors;
use crate::vchordrq::algorithm::PageGuard;
use base::always_equal::AlwaysEqual;
use base::distance::Distance;
use base::distance::DistanceKind;
Expand All @@ -11,7 +12,7 @@ use std::cmp::Reverse;
use std::collections::BinaryHeap;

pub fn insert<V: Vector>(
relation: Relation,
relation: impl RelationWrite + Clone,
payload: Pointer,
vector: V,
distance_kind: DistanceKind,
Expand All @@ -20,7 +21,6 @@ pub fn insert<V: Vector>(
let vector = vector.as_borrowed();
let meta_guard = relation.read(0);
let meta_tuple = meta_guard
.get()
.get(1)
.map(rkyv::check_archived_root::<MetaTuple>)
.expect("data corruption")
Expand Down Expand Up @@ -84,9 +84,8 @@ pub fn insert<V: Vector>(
let mut current = list.0;
while current != u32::MAX {
let h1_guard = relation.read(current);
for i in 1..=h1_guard.get().len() {
for i in 1..=h1_guard.len() {
let h1_tuple = h1_guard
.get()
.get(i)
.map(rkyv::check_archived_root::<Height1Tuple>)
.expect("data corruption")
Expand All @@ -110,7 +109,7 @@ pub fn insert<V: Vector>(
AlwaysEqual(h1_tuple.first),
));
}
current = h1_guard.get().get_opaque().next;
current = h1_guard.get_opaque().next;
}
}
let mut heap = BinaryHeap::from(results);
Expand Down Expand Up @@ -155,11 +154,18 @@ pub fn insert<V: Vector>(
t: code.t(),
})
.unwrap();
append(relation, list.0, &tuple, false, in_building, in_building);
append(
relation.clone(),
list.0,
&tuple,
false,
in_building,
in_building,
);
}

fn append(
relation: Relation,
relation: impl RelationWrite,
first: u32,
tuple: &[u8],
tracking_freespace: bool,
Expand All @@ -168,32 +174,30 @@ fn append(
) -> (u32, u16) {
if tracking_freespace {
if let Some(mut write) = relation.search(tuple.len()) {
let i = write.get_mut().alloc(tuple).unwrap();
let i = write.alloc(tuple).unwrap();
return (write.id(), i);
}
}
assert!(first != u32::MAX);
let mut current = first;
loop {
let read = relation.read(current);
if read.get().freespace() as usize >= tuple.len()
|| read.get().get_opaque().next == u32::MAX
{
if read.freespace() as usize >= tuple.len() || read.get_opaque().next == u32::MAX {
drop(read);
let mut write = relation.write(current, tracking_freespace);
if let Some(i) = write.get_mut().alloc(tuple) {
if let Some(i) = write.alloc(tuple) {
return (current, i);
}
if write.get().get_opaque().next == u32::MAX {
if write.get_opaque().next == u32::MAX {
let mut extend = relation.extend(tracking_freespace);
write.get_mut().get_opaque_mut().next = extend.id();
write.get_opaque_mut().next = extend.id();
drop(write);
if let Some(i) = extend.get_mut().alloc(tuple) {
if let Some(i) = extend.alloc(tuple) {
let result = (extend.id(), i);
drop(extend);
if updating_skip {
let mut past = relation.write(first, tracking_freespace);
let skip = &mut past.get_mut().get_opaque_mut().skip;
let skip = &mut past.get_opaque_mut().skip;
assert!(*skip != u32::MAX);
*skip = std::cmp::max(*skip, result.0);
}
Expand All @@ -202,16 +206,16 @@ fn append(
panic!("a tuple cannot even be fit in a fresh page");
}
}
if skipping_traversal && current == first && write.get().get_opaque().skip != first {
current = write.get().get_opaque().skip;
if skipping_traversal && current == first && write.get_opaque().skip != first {
current = write.get_opaque().skip;
} else {
current = write.get().get_opaque().next;
current = write.get_opaque().next;
}
} else {
if skipping_traversal && current == first && read.get().get_opaque().skip != first {
current = read.get().get_opaque().skip;
if skipping_traversal && current == first && read.get_opaque().skip != first {
current = read.get_opaque().skip;
} else {
current = read.get().get_opaque().next;
current = read.get_opaque().next;
}
}
}
Expand Down
Loading

0 comments on commit 3e23a14

Please sign in to comment.