From 6c796f27aebc026495ea602b8db873d57ed870e6 Mon Sep 17 00:00:00 2001 From: usamoi Date: Fri, 20 Dec 2024 16:40:48 +0800 Subject: [PATCH] [experiment] feat: reranking in heap table Signed-off-by: usamoi --- src/postgres.rs | 3 + src/vchordrq/algorithm/build.rs | 1 - src/vchordrq/algorithm/insert.rs | 28 +------ src/vchordrq/algorithm/scan.rs | 28 +++---- src/vchordrq/algorithm/tuples.rs | 36 +++++++-- src/vchordrq/algorithm/vacuum.rs | 119 +++++++++--------------------- src/vchordrq/algorithm/vectors.rs | 34 ++++++--- src/vchordrq/index/am.rs | 49 +++++++++++- src/vchordrq/index/am_scan.rs | 32 ++++++-- 9 files changed, 176 insertions(+), 154 deletions(-) diff --git a/src/postgres.rs b/src/postgres.rs index c24d669..a06c0b7 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -366,4 +366,7 @@ impl Relation { ) } } + pub fn raw(&self) -> pgrx::pg_sys::Relation { + self.raw + } } diff --git a/src/vchordrq/algorithm/build.rs b/src/vchordrq/algorithm/build.rs index 6554bfc..57a7007 100644 --- a/src/vchordrq/algorithm/build.rs +++ b/src/vchordrq/algorithm/build.rs @@ -85,7 +85,6 @@ pub fn build, R: Reporter>( let mut chain = Err(metadata); for i in (0..slices.len()).rev() { chain = Ok(vectors.push(&VectorTuple { - payload: None, slice: slices[i].to_vec(), chain, })); diff --git a/src/vchordrq/algorithm/insert.rs b/src/vchordrq/algorithm/insert.rs index 323fa24..7c70873 100644 --- a/src/vchordrq/algorithm/insert.rs +++ b/src/vchordrq/algorithm/insert.rs @@ -35,35 +35,13 @@ pub fn insert( } else { None }; - let h0_vector = { - let (metadata, slices) = V::vector_split(vector); - let mut chain = Err(metadata); - for i in (0..slices.len()).rev() { - let tuple = rkyv::to_bytes::<_, 8192>(&VectorTuple:: { - slice: slices[i].to_vec(), - payload: Some(payload.as_u64()), - chain, - }) - .unwrap(); - chain = Ok(append( - relation.clone(), - meta_tuple.vectors_first, - &tuple, - true, - true, - true, - )); - } - chain.ok().unwrap() - }; let h0_payload = payload.as_u64(); let mut list = { - let Some((_, original)) = vectors::vector_dist::( + let Some((_, original)) = vectors::vector_dist_by_mean::( relation.clone(), vector, meta_tuple.mean, None, - None, is_residual, ) else { panic!("data corruption") @@ -117,11 +95,10 @@ pub fn insert( { while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); - let Some((Some(dis_u), original)) = vectors::vector_dist::( + let Some((Some(dis_u), original)) = vectors::vector_dist_by_mean::( relation.clone(), vector, mean, - None, Some(distance_kind), is_residual, ) else { @@ -145,7 +122,6 @@ pub fn insert( V::rabitq_code(dims, vector) }; let tuple = rkyv::to_bytes::<_, 8192>(&Height0Tuple { - mean: h0_vector, payload: h0_payload, dis_u_2: code.dis_u_2, factor_ppc: code.factor_ppc, diff --git a/src/vchordrq/algorithm/scan.rs b/src/vchordrq/algorithm/scan.rs index 42357c0..7a8aebc 100644 --- a/src/vchordrq/algorithm/scan.rs +++ b/src/vchordrq/algorithm/scan.rs @@ -12,12 +12,13 @@ use std::collections::BinaryHeap; pub fn scan( relation: impl RelationRead + Clone, - vector: V, + raw_vector: V, distance_kind: DistanceKind, probes: Vec, epsilon: f32, + fetch_vector: impl Fn(u64) -> Option + Copy + 'static, ) -> impl Iterator { - let vector = vector.as_borrowed(); + let vector = raw_vector.as_borrowed(); let meta_guard = relation.read(0); let meta_tuple = meta_guard .get(1) @@ -36,12 +37,11 @@ pub fn scan( None }; let mut lists: Vec<_> = vec![{ - let Some((_, original)) = vectors::vector_dist::( + let Some((_, original)) = vectors::vector_dist_by_mean::( relation.clone(), vector.as_borrowed(), meta_tuple.mean, None, - None, is_residual, ) else { panic!("data corruption") @@ -98,11 +98,10 @@ pub fn scan( std::iter::from_fn(|| { while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); - let Some((Some(dis_u), original)) = vectors::vector_dist::( + let Some((Some(dis_u), original)) = vectors::vector_dist_by_mean::( relation.clone(), vector.as_borrowed(), mean, - None, Some(distance_kind), is_residual, ) else { @@ -156,11 +155,7 @@ pub fn scan( ), epsilon, ); - results.push(( - Reverse(lowerbounds), - AlwaysEqual(h0_tuple.mean), - AlwaysEqual(h0_tuple.payload), - )); + results.push((Reverse(lowerbounds), AlwaysEqual(h0_tuple.payload))); } current = h0_guard.get_opaque().next; } @@ -169,14 +164,13 @@ pub fn scan( let mut cache = BinaryHeap::<(Reverse, _)>::new(); std::iter::from_fn(move || { while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { - let (_, AlwaysEqual(mean), AlwaysEqual(pay_u)) = heap.pop().unwrap(); - let Some((Some(dis_u), _)) = vectors::vector_dist::( - relation.clone(), - vector.as_borrowed(), - mean, - Some(pay_u), + let (_, AlwaysEqual(pay_u)) = heap.pop().unwrap(); + let Some((Some(dis_u), _)) = vectors::vector_dist_by_fetch::( + raw_vector.as_borrowed(), + pay_u, Some(distance_kind), false, + fetch_vector, ) else { continue; }; diff --git a/src/vchordrq/algorithm/tuples.rs b/src/vchordrq/algorithm/tuples.rs index 40a795c..3d802b7 100644 --- a/src/vchordrq/algorithm/tuples.rs +++ b/src/vchordrq/algorithm/tuples.rs @@ -1,8 +1,8 @@ use super::rabitq::{self, Code, Lut}; use crate::vchordrq::types::OwnedVector; -use base::distance::DistanceKind; +use base::distance::{Distance, DistanceKind}; use base::simd::ScalarLike; -use base::vector::{VectOwned, VectorOwned}; +use base::vector::{VectOwned, VectorBorrowed, VectorOwned}; use half::f16; use rkyv::{Archive, ArchiveUnsized, CheckBytes, Deserialize, Serialize}; @@ -51,6 +51,11 @@ pub trait Vector: VectorOwned { left: Self::Metadata, right: Self::Metadata, ) -> f32; + fn distance( + distance_kind: DistanceKind, + lhs: Self::Borrowed<'_>, + rhs: Self::Borrowed<'_>, + ) -> Distance; fn random_projection(vector: Self::Borrowed<'_>) -> Self; @@ -120,6 +125,18 @@ impl Vector for VectOwned { ) -> f32 { accumulator.1 } + fn distance( + distance_kind: DistanceKind, + lhs: Self::Borrowed<'_>, + rhs: Self::Borrowed<'_>, + ) -> Distance { + match distance_kind { + DistanceKind::L2 => lhs.operator_l2(rhs), + DistanceKind::Dot => lhs.operator_dot(rhs), + DistanceKind::Hamming => unreachable!(), + DistanceKind::Jaccard => unreachable!(), + } + } fn random_projection(vector: Self::Borrowed<'_>) -> Self { Self::new(crate::projection::project(vector.slice())) @@ -201,6 +218,18 @@ impl Vector for VectOwned { ) -> f32 { accumulator.1 } + fn distance( + distance_kind: DistanceKind, + lhs: Self::Borrowed<'_>, + rhs: Self::Borrowed<'_>, + ) -> Distance { + match distance_kind { + DistanceKind::L2 => lhs.operator_l2(rhs), + DistanceKind::Dot => lhs.operator_dot(rhs), + DistanceKind::Hamming => unreachable!(), + DistanceKind::Jaccard => unreachable!(), + } + } fn random_projection(vector: Self::Borrowed<'_>) -> Self { Self::new(f16::vector_from_f32(&crate::projection::project( @@ -246,7 +275,6 @@ pub struct MetaTuple { #[archive(check_bytes)] pub struct VectorTuple { pub slice: Vec, - pub payload: Option, pub chain: Result<(u32, u16), V::Metadata>, } @@ -268,8 +296,6 @@ pub struct Height1Tuple { #[derive(Clone, PartialEq, Archive, Serialize, Deserialize)] #[archive(check_bytes)] pub struct Height0Tuple { - // raw vector - pub mean: (u32, u16), // for height 0 tuple, it's pointers to heap relation pub payload: u64, // RaBitQ algorithm diff --git a/src/vchordrq/algorithm/vacuum.rs b/src/vchordrq/algorithm/vacuum.rs index c77bd21..b6d6457 100644 --- a/src/vchordrq/algorithm/vacuum.rs +++ b/src/vchordrq/algorithm/vacuum.rs @@ -7,106 +7,53 @@ pub fn vacuum( delay: impl Fn(), callback: impl Fn(Pointer) -> bool, ) { - // step 1: vacuum height_0_tuple - { - let meta_guard = relation.read(0); - let meta_tuple = meta_guard - .get(1) - .map(rkyv::check_archived_root::) - .expect("data corruption") - .expect("data corruption"); - let mut firsts = vec![meta_tuple.first]; - let make_firsts = |firsts| { - let mut results = Vec::new(); - for first in firsts { - let mut current = first; - while current != u32::MAX { - let h1_guard = relation.read(current); - for i in 1..=h1_guard.len() { - let h1_tuple = h1_guard - .get(i) - .map(rkyv::check_archived_root::) - .expect("data corruption") - .expect("data corruption"); - results.push(h1_tuple.first); - } - current = h1_guard.get_opaque().next; - } - } - results - }; - for _ in (1..meta_tuple.height_of_root).rev() { - firsts = make_firsts(firsts); - } + let meta_guard = relation.read(0); + let meta_tuple = meta_guard + .get(1) + .map(rkyv::check_archived_root::) + .expect("data corruption") + .expect("data corruption"); + let mut firsts = vec![meta_tuple.first]; + let make_firsts = |firsts| { + let mut results = Vec::new(); for first in firsts { let mut current = first; while current != u32::MAX { - delay(); - let mut h0_guard = relation.write(current, false); - let mut reconstruct_removes = Vec::new(); - for i in 1..=h0_guard.len() { - let h0_tuple = h0_guard + let h1_guard = relation.read(current); + for i in 1..=h1_guard.len() { + let h1_tuple = h1_guard .get(i) - .map(rkyv::check_archived_root::) + .map(rkyv::check_archived_root::) .expect("data corruption") .expect("data corruption"); - if callback(Pointer::new(h0_tuple.payload)) { - reconstruct_removes.push(i); - } + results.push(h1_tuple.first); } - h0_guard.reconstruct(&reconstruct_removes); - current = h0_guard.get_opaque().next; + current = h1_guard.get_opaque().next; } } + results + }; + for _ in (1..meta_tuple.height_of_root).rev() { + firsts = make_firsts(firsts); } - // step 2: vacuum vector_tuple - { - let mut current = { - let meta_guard = relation.read(0); - let meta_tuple = meta_guard - .get(1) - .map(rkyv::check_archived_root::) - .expect("data corruption") - .expect("data corruption"); - meta_tuple.vectors_first - }; + for first in firsts { + let mut current = first; while current != u32::MAX { delay(); - let read = relation.read(current); - let flag = 'flag: { - for i in 1..=read.len() { - let Some(vector_tuple) = read.get(i) else { - continue; - }; - let vector_tuple = - unsafe { rkyv::archived_root::>(vector_tuple) }; - if let Some(payload) = vector_tuple.payload.as_ref().copied() { - if callback(Pointer::new(payload)) { - break 'flag true; - } - } - } - false - }; - if flag { - drop(read); - let mut write = relation.write(current, true); - for i in 1..=write.len() { - let Some(vector_tuple) = write.get(i) else { - continue; - }; - let vector_tuple = - unsafe { rkyv::archived_root::>(vector_tuple) }; - if let Some(payload) = vector_tuple.payload.as_ref().copied() { - if callback(Pointer::new(payload)) { - write.free(i); - } - } + let mut h0_guard = relation.write(current, false); + let mut reconstruct_removes = Vec::new(); + for i in 1..=h0_guard.len() { + let h0_tuple = h0_guard + .get(i) + .map(rkyv::check_archived_root::) + .expect("data corruption") + .expect("data corruption"); + if callback(Pointer::new(h0_tuple.payload)) { + reconstruct_removes.push(i); } - current = write.get_opaque().next; - } else { - current = read.get_opaque().next; } + h0_guard.reconstruct(&reconstruct_removes); + current = h0_guard.get_opaque().next; } } } diff --git a/src/vchordrq/algorithm/vectors.rs b/src/vchordrq/algorithm/vectors.rs index 06075d3..27e4eea 100644 --- a/src/vchordrq/algorithm/vectors.rs +++ b/src/vchordrq/algorithm/vectors.rs @@ -4,20 +4,38 @@ use crate::vchordrq::algorithm::tuples::VectorTuple; use base::distance::Distance; use base::distance::DistanceKind; -pub fn vector_dist( +pub fn vector_dist_by_fetch( + vector: V::Borrowed<'_>, + payload: u64, + for_distance: Option, + for_original: bool, + fetch_vector: impl Fn(u64) -> Option, +) -> Option<(Option, Option)> { + if for_distance.is_none() && !for_original { + return Some((None, None)); + } + let original = fetch_vector(payload)?; + Some(( + for_distance + .map(|distance_kind| V::distance(distance_kind, original.as_borrowed(), vector)), + for_original.then_some(original), + )) +} + +pub fn vector_dist_by_mean( relation: impl RelationRead, vector: V::Borrowed<'_>, mean: (u32, u16), - payload: Option, for_distance: Option, for_original: bool, ) -> Option<(Option, Option)> { - if for_distance.is_none() && !for_original && payload.is_none() { + if for_distance.is_none() && !for_original { return Some((None, None)); } let (left_metadata, slices) = V::vector_split(vector); let mut cursor = Ok(mean); - let mut result = for_distance.map(|x| V::distance_begin(x)); + let mut result: Option<::DistanceAccumulator> = + for_distance.map(|x| V::distance_begin(x)); let mut original = Vec::new(); for i in 0..slices.len() { let Ok(mean) = cursor else { @@ -30,10 +48,6 @@ pub fn vector_dist( return None; }; let vector_tuple = unsafe { rkyv::archived_root::>(vector_tuple) }; - if vector_tuple.payload != payload { - // fails consistency check - return None; - } if let Some(result) = result.as_mut() { V::distance_next(result, slices[i], &vector_tuple.slice); } @@ -63,10 +77,6 @@ pub fn vector_warm(relation: impl RelationRead, mean: (u32, u16)) { return; }; let vector_tuple = unsafe { rkyv::archived_root::>(vector_tuple) }; - if vector_tuple.payload.is_some() { - // fails consistency check - return; - } cursor = match &vector_tuple.chain { rkyv::result::ArchivedResult::Ok(x) => Ok(*x), rkyv::result::ArchivedResult::Err(x) => Err(V::metadata_from_archived(x)), diff --git a/src/vchordrq/index/am.rs b/src/vchordrq/index/am.rs index 1dc7cdc..6f48983 100644 --- a/src/vchordrq/index/am.rs +++ b/src/vchordrq/index/am.rs @@ -6,7 +6,7 @@ use crate::vchordrq::index::am_options::{Opfamily, Reloption}; use crate::vchordrq::index::am_scan::Scanner; use crate::vchordrq::index::utils::{ctid_to_pointer, pointer_to_ctid}; use crate::vchordrq::index::{am_options, am_scan}; -use crate::vchordrq::types::VectorKind; +use crate::vchordrq::types::{OwnedVector, VectorKind}; use base::search::Pointer; use base::vector::VectOwned; use half::f16; @@ -855,7 +855,9 @@ pub unsafe extern "C" fn amgettuple( } let scanner = unsafe { (*scan).opaque.cast::().as_mut().unwrap_unchecked() }; let relation = unsafe { Relation::new((*scan).indexRelation) }; - if let Some((pointer, recheck)) = am_scan::scan_next(scanner, relation) { + if let Some((pointer, recheck)) = + am_scan::scan_next(scanner, relation, unsafe { (*scan).heapRelation }) + { let ctid = pointer_to_ctid(pointer); unsafe { (*scan).xs_heaptid = ctid; @@ -919,3 +921,46 @@ pub unsafe extern "C" fn amvacuumcleanup( ) -> *mut pgrx::pg_sys::IndexBulkDeleteResult { std::ptr::null_mut() } + +pub(super) unsafe fn fetch_vector( + opfamily: Opfamily, + heap_relation: pgrx::pg_sys::Relation, + attnum: i16, + payload: u64, +) -> Option { + unsafe { + let slot = pgrx::pg_sys::table_slot_create(heap_relation, std::ptr::null_mut()); + let table_am = (*heap_relation).rd_tableam; + let fetch_row_version = (*table_am).tuple_fetch_row_version.unwrap(); + let mut ctid = pointer_to_ctid(Pointer::new(payload)); + fetch_row_version( + heap_relation, + &mut ctid, + &raw mut pgrx::pg_sys::SnapshotAnyData, + slot, + ); + assert!(attnum > 0); + if attnum > (*slot).tts_nvalid { + pgrx::pg_sys::slot_getsomeattrs(slot, attnum as i32); + } + let result = opfamily.datum_to_vector( + (*slot).tts_values.add(attnum as usize - 1).read(), + (*slot).tts_isnull.add(attnum as usize - 1).read(), + ); + if !slot.is_null() { + pgrx::pg_sys::ExecDropSingleTupleTableSlot(slot); + } + result + } +} + +pub(super) unsafe fn get_attribute_number_from_index( + index: pgrx::pg_sys::Relation, +) -> pgrx::pg_sys::AttrNumber { + unsafe { + let a = (*index).rd_index; + let natts = (*a).indnatts; + assert!(natts == 1); + (*a).indkey.values.as_slice(natts as _)[0] + } +} diff --git a/src/vchordrq/index/am_scan.rs b/src/vchordrq/index/am_scan.rs index 1b78ff0..8c3a87e 100644 --- a/src/vchordrq/index/am_scan.rs +++ b/src/vchordrq/index/am_scan.rs @@ -5,6 +5,7 @@ use crate::vchordrq::algorithm::tuples::Vector; use crate::vchordrq::gucs::executing::epsilon; use crate::vchordrq::gucs::executing::max_scan_tuples; use crate::vchordrq::gucs::executing::probes; +use crate::vchordrq::index::am::{fetch_vector, get_attribute_number_from_index}; use crate::vchordrq::types::OwnedVector; use crate::vchordrq::types::VectorKind; use base::distance::Distance; @@ -19,7 +20,7 @@ pub enum Scanner { recheck: bool, }, Vbase { - vbase: Box>, + vbase: Box + 'static>, threshold: Option, recheck: bool, opfamily: Opfamily, @@ -66,14 +67,19 @@ pub fn scan_make( } } -pub fn scan_next(scanner: &mut Scanner, relation: Relation) -> Option<(Pointer, bool)> { +pub fn scan_next( + scanner: &mut Scanner, + relation: Relation, + heap: pgrx::pg_sys::Relation, +) -> Option<(Pointer, bool)> { + let index = relation.raw(); if let Scanner::Initial { vector, threshold, recheck, } = scanner { - if let Some((vector, opfamily)) = vector.as_ref() { + if let Some((vector, opfamily)) = vector.clone() { match opfamily.vector_kind() { VectorKind::Vecf32 => { let vbase = scan::>( @@ -82,6 +88,14 @@ pub fn scan_next(scanner: &mut Scanner, relation: Relation) -> Option<(Pointer, opfamily.distance_kind(), probes(), epsilon(), + move |payload| unsafe { + Some(VectOwned::::from_owned(fetch_vector( + opfamily, + heap, + get_attribute_number_from_index(index), + payload, + )?)) + }, ); *scanner = Scanner::Vbase { vbase: if let Some(max_scan_tuples) = max_scan_tuples() { @@ -91,7 +105,7 @@ pub fn scan_next(scanner: &mut Scanner, relation: Relation) -> Option<(Pointer, }, threshold: *threshold, recheck: *recheck, - opfamily: *opfamily, + opfamily, }; } VectorKind::Vecf16 => { @@ -101,6 +115,14 @@ pub fn scan_next(scanner: &mut Scanner, relation: Relation) -> Option<(Pointer, opfamily.distance_kind(), probes(), epsilon(), + move |payload| unsafe { + Some(VectOwned::::from_owned(fetch_vector( + opfamily, + heap, + get_attribute_number_from_index(index), + payload, + )?)) + }, ); *scanner = Scanner::Vbase { vbase: if let Some(max_scan_tuples) = max_scan_tuples() { @@ -110,7 +132,7 @@ pub fn scan_next(scanner: &mut Scanner, relation: Relation) -> Option<(Pointer, }, threshold: *threshold, recheck: *recheck, - opfamily: *opfamily, + opfamily, }; } }