Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experimental] feat: pinning index in memory when building #150

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ nalgebra = "=0.33.0"
# lock rkyv version forever so that data is always compatible
rkyv = { version = "=0.7.45", features = ["validation"] }

bincode = "1.3.3"
half = { version = "2.4.1", features = ["rkyv"] }
log = "0.4.22"
paste = "1"
Expand Down
2 changes: 0 additions & 2 deletions src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl Page {
assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize);
this
}
#[allow(dead_code)]
pub fn clone_into_boxed(&self) -> Box<Self> {
let mut result = Box::new_uninit();
unsafe {
Expand Down Expand Up @@ -357,7 +356,6 @@ impl Relation {
}
}
}
#[allow(dead_code)]
pub fn len(&self) -> u32 {
unsafe {
pgrx::pg_sys::RelationGetNumberOfBlocksInFork(
Expand Down
201 changes: 197 additions & 4 deletions src/vchordrq/index/am.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::postgres::Relation;
use crate::vchordrq::algorithm;
use crate::postgres::{Page, Relation};
use crate::vchordrq::algorithm::build::{HeapRelation, Reporter};
use crate::vchordrq::algorithm::tuples::Vector;
use crate::vchordrq::algorithm::{self, PageGuard};
use crate::vchordrq::algorithm::{RelationRead, RelationWrite};
use crate::vchordrq::index::am_options::{Opfamily, Reloption};
use crate::vchordrq::index::am_scan::Scanner;
use crate::vchordrq::index::utils::{ctid_to_pointer, pointer_to_ctid};
Expand All @@ -12,6 +13,8 @@ use base::vector::VectOwned;
use half::f16;
use pgrx::datum::Internal;
use pgrx::pg_sys::Datum;
use std::collections::HashMap;
use std::ops::Deref;

static mut RELOPT_KIND_VCHORDRQ: pgrx::pg_sys::relopt_kind::Type = 0;

Expand Down Expand Up @@ -259,7 +262,52 @@ pub unsafe extern "C" fn ambuild(
reporter.clone(),
),
}
if let Some(leader) = unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent) }
let cache = {
let n = index_relation.len();
let mut dir = HashMap::<u32, usize>::with_capacity(n as _);
let mut pages = Vec::<Box<Page>>::new();
{
use crate::vchordrq::algorithm::tuples::{Height1Tuple, MetaTuple};
let mut read = |id| {
let result = index_relation.read(id);
dir.insert(id, pages.len());
pages.push(result.clone_into_boxed());
result
};
let meta_guard = read(0);
let meta_tuple = meta_guard
.get(1)
.map(rkyv::check_archived_root::<MetaTuple>)
.expect("data corruption")
.expect("data corruption");
let mut firsts = vec![meta_tuple.first];
let mut make_firsts = |firsts| {
let mut results = Vec::new();
for first in firsts {
let mut current = first;
while current != u32::MAX {
let h1_guard = read(current);
for i in 1..=h1_guard.len() {
let h1_tuple = h1_guard
.get(i)
.map(rkyv::check_archived_root::<Height1Tuple>)
.expect("data corruption")
.expect("data corruption");
results.push(h1_tuple.first);
}
current = h1_guard.get_opaque().next;
}
}
results
};
for _ in (1..meta_tuple.height_of_root).rev() {
firsts = make_firsts(firsts);
}
}
(dir, pages)
};
if let Some(leader) =
unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent, &cache) }
{
unsafe {
parallel_build(
Expand All @@ -269,6 +317,8 @@ pub unsafe extern "C" fn ambuild(
leader.tablescandesc,
leader.vchordrqshared,
Some(reporter),
&*leader.cache_0,
&*leader.cache_1,
);
leader.wait();
let nparticipants = leader.nparticipants;
Expand All @@ -290,6 +340,10 @@ pub unsafe extern "C" fn ambuild(
let mut indtuples = 0;
reporter.tuples_done(indtuples);
let relation = unsafe { Relation::new(index) };
let relation = CachingRelation {
cache: &(cache.0, cache.1.iter().map(|x| x.as_ref()).collect()),
relation,
};
match opfamily.vector_kind() {
VectorKind::Vecf32 => {
HeapRelation::<VectOwned<f32>>::traverse(
Expand Down Expand Up @@ -330,11 +384,78 @@ pub unsafe extern "C" fn ambuild(
unsafe { pgrx::pgbox::PgBox::<pgrx::pg_sys::IndexBuildResult>::alloc0().into_pg() }
}

#[derive(Clone)]
struct CachingRelation<'a, R> {
cache: &'a (HashMap<u32, usize>, Vec<&'a Page>),
relation: R,
}

enum CachingRelationReadGuard<'a, G> {
Wrapping(G),
Cached(u32, &'a Page),
}

impl<G: PageGuard> PageGuard for CachingRelationReadGuard<'_, G> {
fn id(&self) -> u32 {
match self {
CachingRelationReadGuard::Wrapping(x) => x.id(),
CachingRelationReadGuard::Cached(id, _) => *id,
}
}
}

impl<G: Deref<Target = Page>> Deref for CachingRelationReadGuard<'_, G> {
type Target = Page;

fn deref(&self) -> &Self::Target {
match self {
CachingRelationReadGuard::Wrapping(x) => x,
CachingRelationReadGuard::Cached(_, page) => page,
}
}
}

impl<R: RelationRead> RelationRead for CachingRelation<'_, R> {
type ReadGuard<'a>
= CachingRelationReadGuard<'a, R::ReadGuard<'a>>
where
Self: 'a;

fn read(&self, id: u32) -> Self::ReadGuard<'_> {
if let Some(&x) = self.cache.0.get(&id) {
CachingRelationReadGuard::Cached(id, self.cache.1[x])
} else {
CachingRelationReadGuard::Wrapping(self.relation.read(id))
}
}
}

impl<R: RelationWrite> RelationWrite for CachingRelation<'_, R> {
type WriteGuard<'a>
= R::WriteGuard<'a>
where
Self: 'a;

fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> {
self.relation.write(id, tracking_freespace)
}

fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> {
self.relation.extend(tracking_freespace)
}

fn search(&self, freespace: usize) -> Option<Self::WriteGuard<'_>> {
self.relation.search(freespace)
}
}

struct VchordrqShared {
/* Immutable state */
heaprelid: pgrx::pg_sys::Oid,
indexrelid: pgrx::pg_sys::Oid,
isconcurrent: bool,
est_cache_0: usize,
est_cache_1: usize,

/* Worker progress */
workersdonecv: pgrx::pg_sys::ConditionVariable,
Expand All @@ -360,6 +481,8 @@ struct VchordrqLeader {
nparticipants: i32,
vchordrqshared: *mut VchordrqShared,
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
cache_0: *const [u8],
cache_1: *const [u8],
snapshot: pgrx::pg_sys::Snapshot,
}

Expand All @@ -368,7 +491,10 @@ impl VchordrqLeader {
heap: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
isconcurrent: bool,
cache: &(HashMap<u32, usize>, Vec<Box<Page>>),
) -> Option<Self> {
let cache_mapping: Vec<u8> = bincode::serialize(&cache.0).unwrap();

unsafe fn compute_parallel_workers(
heap: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
Expand Down Expand Up @@ -419,11 +545,17 @@ impl VchordrqLeader {
}
let est_tablescandesc =
unsafe { pgrx::pg_sys::table_parallelscan_estimate(heap, snapshot) };
let est_cache_0 = cache_mapping.len();
let est_cache_1 = cache.1.len() * size_of::<Page>();
unsafe {
estimate_chunk(&mut (*pcxt).estimator, size_of::<VchordrqShared>());
estimate_keys(&mut (*pcxt).estimator, 1);
estimate_chunk(&mut (*pcxt).estimator, est_tablescandesc);
estimate_keys(&mut (*pcxt).estimator, 1);
estimate_chunk(&mut (*pcxt).estimator, est_cache_0);
estimate_keys(&mut (*pcxt).estimator, 1);
estimate_chunk(&mut (*pcxt).estimator, est_cache_1);
estimate_keys(&mut (*pcxt).estimator, 1);
}

unsafe {
Expand All @@ -450,6 +582,8 @@ impl VchordrqLeader {
mutex: std::mem::zeroed(),
nparticipantsdone: 0,
indtuples: 0,
est_cache_0,
est_cache_1,
});
pgrx::pg_sys::ConditionVariableInit(&raw mut (*vchordrqshared).workersdonecv);
pgrx::pg_sys::SpinLockInit(&raw mut (*vchordrqshared).mutex);
Expand All @@ -463,9 +597,29 @@ impl VchordrqLeader {
tablescandesc
};

let cache_0 = unsafe {
let cache_0 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_0).cast::<u8>();
std::ptr::copy(cache_mapping.as_ptr(), cache_0, est_cache_0);
core::ptr::slice_from_raw_parts(cache_0, est_cache_0)
};

let cache_1 = unsafe {
let cache_1 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_1).cast::<u8>();
for i in 0..cache.1.len() {
std::ptr::copy(
(cache.1[i].deref() as *const Page).cast::<u8>(),
cache_1.cast::<Page>().add(i).cast(),
size_of::<Page>(),
);
}
core::ptr::slice_from_raw_parts(cache_1, est_cache_1)
};

unsafe {
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000001, vchordrqshared.cast());
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000002, tablescandesc.cast());
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000003, cache_0 as _);
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000004, cache_1 as _);
}

unsafe {
Expand All @@ -491,6 +645,8 @@ impl VchordrqLeader {
nparticipants: nworkers_launched + 1,
vchordrqshared,
tablescandesc,
cache_0,
cache_1,
snapshot,
})
}
Expand Down Expand Up @@ -530,6 +686,18 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000002, false)
.cast::<pgrx::pg_sys::ParallelTableScanDescData>()
};
let cache_0 = unsafe {
std::slice::from_raw_parts(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000003, false).cast::<u8>(),
(*vchordrqshared).est_cache_0,
)
};
let cache_1 = unsafe {
std::slice::from_raw_parts(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000004, false).cast::<u8>(),
(*vchordrqshared).est_cache_1,
)
};
let heap_lockmode;
let index_lockmode;
if unsafe { !(*vchordrqshared).isconcurrent } {
Expand All @@ -547,7 +715,16 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main(
}

unsafe {
parallel_build(index, heap, index_info, tablescandesc, vchordrqshared, None);
parallel_build(
index,
heap,
index_info,
tablescandesc,
vchordrqshared,
None,
cache_0,
cache_1,
);
}

unsafe {
Expand All @@ -563,6 +740,8 @@ unsafe fn parallel_build(
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
vchordrqshared: *mut VchordrqShared,
mut reporter: Option<PgReporter>,
cache_0: &[u8],
cache_1: &[u8],
) {
#[derive(Debug, Clone)]
pub struct Heap {
Expand Down Expand Up @@ -628,6 +807,20 @@ unsafe fn parallel_build(
}

let index_relation = unsafe { Relation::new(index) };
let index_relation = CachingRelation {
cache: {
let cache_0: HashMap<u32, usize> = bincode::deserialize(cache_0).unwrap();
assert!(cache_1.len() % size_of::<Page>() == 0);
let n = cache_1.len() / size_of::<Page>();
let cache_1 = unsafe {
(0..n)
.map(|i| &*cache_1.as_ptr().cast::<Page>().add(i))
.collect::<Vec<&Page>>()
};
&(cache_0, cache_1)
},
relation: index_relation,
};

let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap, tablescandesc) };
let opfamily = unsafe { am_options::opfamily(index) };
Expand Down
Loading