Skip to content

Commit

Permalink
fix: race condition if TEMP relation is first-accessed concurrently
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi committed Mar 18, 2024
1 parent 59afde9 commit ade4603
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 65 deletions.
134 changes: 98 additions & 36 deletions src/index/am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,77 +132,137 @@ pub unsafe extern "C" fn amcostestimate(

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambuild(
heap_relation: pgrx::pg_sys::Relation,
index_relation: pgrx::pg_sys::Relation,
heap: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
index_info: *mut pgrx::pg_sys::IndexInfo,
) -> *mut pgrx::pg_sys::IndexBuildResult {
initialize_meta(index, pgrx::pg_sys::ForkNumber_MAIN_FORKNUM);
let result = pgrx::PgBox::<pgrx::pg_sys::IndexBuildResult>::alloc0();
am_build::build(
index_relation,
Some((heap_relation, index_info, result.as_ptr())),
);
make_well_formed(index_relation);
make_well_formed(index);
am_build::build_insertions(index, heap, index_info, result.as_ptr());
result.into_pg()
}

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambuildempty(_index: pgrx::pg_sys::Relation) {}
pub unsafe extern "C" fn ambuildempty(index: pgrx::pg_sys::Relation) {
initialize_meta(index, pgrx::pg_sys::ForkNumber_INIT_FORKNUM);
}

unsafe fn initialize_meta(index: pgrx::pg_sys::Relation, forknum: i32) {
unsafe {
let meta_buffer = scopeguard::guard(
{
let meta_buffer = pgrx::pg_sys::ReadBufferExtended(
index,
forknum,
0xFFFFFFFF,
pgrx::pg_sys::ReadBufferMode_RBM_NORMAL,
std::ptr::null_mut(),
);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_EXCLUSIVE as _);
meta_buffer
},
|meta_buffer| {
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
},
);
let xlog = pgrx::pg_sys::GenericXLogStart(index);
pgrx::pg_sys::GenericXLogRegisterBuffer(
xlog,
*meta_buffer,
pgrx::pg_sys::GENERIC_XLOG_FULL_IMAGE as _,
);
pgrx::pg_sys::GenericXLogFinish(xlog);
}
}

#[repr(C)]
struct VectorsPageOpaqueData {
_reserved: [u8; 2048],
version: u32,
_reserved: [u8; 2044],
}

const _: () = assert!(std::mem::size_of::<VectorsPageOpaqueData>() == 2048);

unsafe fn make_well_formed(index_relation: pgrx::pg_sys::Relation) {
unsafe fn make_well_formed(index: pgrx::pg_sys::Relation) {
unsafe {
let meta_buffer = pgrx::pg_sys::ReadBuffer(index_relation, 0xFFFFFFFF /* P_NEW */);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_EXCLUSIVE as _);
assert!(pgrx::pg_sys::BufferGetBlockNumber(meta_buffer) == 0);
let state = pgrx::pg_sys::GenericXLogStart(index_relation);
let meta_page = pgrx::pg_sys::GenericXLogRegisterBuffer(
state,
meta_buffer,
let meta_buffer = scopeguard::guard(
{
let meta_buffer = pgrx::pg_sys::ReadBuffer(index, 0);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_SHARE as _);
meta_buffer
},
|meta_buffer| {
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
},
);
let meta_page = pgrx::pg_sys::BufferGetPage(*meta_buffer);
let meta_header = meta_page.cast::<pgrx::pg_sys::PageHeaderData>();
let meta_offset = (*meta_header).pd_special as usize;
let meta_opaque = meta_page.add(meta_offset).cast::<VectorsPageOpaqueData>();
if (*meta_opaque).version == 0 {
return;
}
am_build::build(index);
let xlog = pgrx::pg_sys::GenericXLogStart(index);
let xlog_meta_page = pgrx::pg_sys::GenericXLogRegisterBuffer(
xlog,
*meta_buffer,
pgrx::pg_sys::GENERIC_XLOG_FULL_IMAGE as _,
);
pgrx::pg_sys::PageInit(
meta_page,
xlog_meta_page,
pgrx::pg_sys::BLCKSZ as usize,
std::mem::size_of::<VectorsPageOpaqueData>(),
);
pgrx::pg_sys::GenericXLogFinish(state);
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
let xlog_meta_header = xlog_meta_page.cast::<pgrx::pg_sys::PageHeaderData>();
let xlog_meta_offset = (*xlog_meta_header).pd_special as usize;
let xlog_meta_opaque = xlog_meta_page
.add(xlog_meta_offset)
.cast::<VectorsPageOpaqueData>();
(*xlog_meta_opaque).version = 1;
pgrx::pg_sys::GenericXLogFinish(xlog);
}
}

unsafe fn check_well_formed(index_relation: pgrx::pg_sys::Relation) {
if !test_well_formed(index_relation) {
am_build::build(index_relation, None);
make_well_formed(index_relation);
unsafe fn check_well_formed(index: pgrx::pg_sys::Relation) {
if !test_well_formed(index) {
make_well_formed(index);
}
}

unsafe fn test_well_formed(index_relation: pgrx::pg_sys::Relation) -> bool {
pgrx::pg_sys::RelationGetNumberOfBlocksInFork(
index_relation,
pgrx::pg_sys::ForkNumber_MAIN_FORKNUM,
) == 1
unsafe fn test_well_formed(index: pgrx::pg_sys::Relation) -> bool {
unsafe {
let meta_buffer = scopeguard::guard(pgrx::pg_sys::ReadBuffer(index, 0), |meta_buffer| {
pgrx::pg_sys::ReleaseBuffer(meta_buffer);
});
let _lock_meta_buffer = scopeguard::guard(
pgrx::pg_sys::LockBuffer(*meta_buffer, pgrx::pg_sys::BUFFER_LOCK_SHARE as _),
|_| {
pgrx::pg_sys::LockBuffer(*meta_buffer, pgrx::pg_sys::BUFFER_LOCK_UNLOCK as _);
},
);
let meta_page = pgrx::pg_sys::BufferGetPage(*meta_buffer);
let meta_header = meta_page.cast::<pgrx::pg_sys::PageHeaderData>();
let meta_offset = (*meta_header).pd_special as usize;
let meta_opaque = meta_page.add(meta_offset).cast::<VectorsPageOpaqueData>();
(*meta_opaque).version == 1
}
}

#[pgrx::pg_guard]
pub unsafe extern "C" fn aminsert(
index_relation: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
values: *mut Datum,
is_null: *mut bool,
heap_tid: pgrx::pg_sys::ItemPointer,
_heap_relation: pgrx::pg_sys::Relation,
_heap: pgrx::pg_sys::Relation,
_check_unique: pgrx::pg_sys::IndexUniqueCheck,
_index_unchanged: bool,
_index_info: *mut pgrx::pg_sys::IndexInfo,
) -> bool {
check_well_formed(index_relation);
let oid = (*index_relation).rd_id;
check_well_formed(index);
let oid = (*index).rd_id;
let id = get_handle(oid);
let vector = from_datum(*values.add(0), *is_null.add(0));
if let Some(v) = vector {
Expand All @@ -213,14 +273,14 @@ pub unsafe extern "C" fn aminsert(

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambeginscan(
index_relation: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
n_keys: std::os::raw::c_int,
n_orderbys: std::os::raw::c_int,
) -> pgrx::pg_sys::IndexScanDesc {
check_well_formed(index_relation);
check_well_formed(index);
assert!(n_keys == 0);
assert!(n_orderbys == 1);
am_scan::make_scan(index_relation)
am_scan::make_scan(index)
}

#[pgrx::pg_guard]
Expand Down Expand Up @@ -261,6 +321,8 @@ pub unsafe extern "C" fn ambulkdelete(
) -> *mut pgrx::pg_sys::IndexBulkDeleteResult {
if !test_well_formed((*info).index) {
pgrx::warning!("The vector index is not initialized.");
let result = pgrx::PgBox::<pgrx::pg_sys::IndexBulkDeleteResult>::alloc0();
return result.into_pg();
}
let oid = (*(*info).index).rd_id;
let id = get_handle(oid);
Expand Down
42 changes: 20 additions & 22 deletions src/index/am_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@ use pgrx::pg_sys::{IndexBuildResult, IndexInfo, RelationData};

pub struct Builder {
pub rpc: ClientRpc,
pub heap_relation: *mut RelationData,
pub heap: *mut RelationData,
pub index_info: *mut IndexInfo,
pub result: *mut IndexBuildResult,
}

pub unsafe fn build(
index: pgrx::pg_sys::Relation,
data: Option<(*mut RelationData, *mut IndexInfo, *mut IndexBuildResult)>,
) {
pub unsafe fn build(index: pgrx::pg_sys::Relation) {
let oid = (*index).rd_id;
let id = get_handle(oid);
let options = options(index);
Expand All @@ -31,26 +28,27 @@ pub unsafe fn build(
}
}
super::hook_maintain::maintain_index_in_index_create(id);
if let Some((heap_relation, index_info, result)) = data {
let mut builder = Builder {
rpc,
heap_relation,
index_info,
result,
};
pgrx::pg_sys::IndexBuildHeapScan(
heap_relation,
index,
index_info,
Some(callback),
&mut builder,
);
}
}

pub unsafe fn build_insertions(
index: pgrx::pg_sys::Relation,
heap: *mut RelationData,
index_info: *mut IndexInfo,
result: *mut IndexBuildResult,
) {
let rpc = check_client(crate::ipc::client());
let mut builder = Builder {
rpc,
heap,
index_info,
result,
};
pgrx::pg_sys::IndexBuildHeapScan(heap, index, index_info, Some(callback), &mut builder);
}

#[pgrx::pg_guard]
unsafe extern "C" fn callback(
index_relation: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
ctid: pgrx::pg_sys::ItemPointer,
values: *mut pgrx::pg_sys::Datum,
is_null: *mut bool,
Expand All @@ -62,7 +60,7 @@ unsafe extern "C" fn callback(
(*state.result).heap_tuples += 1.0;
return;
}
let oid = (*index_relation).rd_id;
let oid = (*index).rd_id;
let id = get_handle(oid);
let vector = from_datum(*values.add(0), *is_null.add(0));
let vector = match vector {
Expand Down
4 changes: 2 additions & 2 deletions src/index/am_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub enum Scanner {
Empty {},
}

pub unsafe fn make_scan(index_relation: pgrx::pg_sys::Relation) -> pgrx::pg_sys::IndexScanDesc {
pub unsafe fn make_scan(index: pgrx::pg_sys::Relation) -> pgrx::pg_sys::IndexScanDesc {
use pgrx::PgMemoryContexts;

let scan = pgrx::pg_sys::RelationGetIndexScan(index_relation, 0, 1);
let scan = pgrx::pg_sys::RelationGetIndexScan(index, 0, 1);

(*scan).xs_recheck = false;
(*scan).xs_recheckorderby = false;
Expand Down
9 changes: 4 additions & 5 deletions src/index/am_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ unsafe fn convert_varlena_to_soi(
}
}

pub unsafe fn options(index_relation: pgrx::pg_sys::Relation) -> IndexOptions {
let opfamily = unsafe { (*index_relation).rd_opfamily.read() };
let att = unsafe { &mut *(*index_relation).rd_att };
pub unsafe fn options(index: pgrx::pg_sys::Relation) -> IndexOptions {
let opfamily = unsafe { (*index).rd_opfamily.read() };
let att = unsafe { &mut *(*index).rd_att };
let atts = unsafe { att.attrs.as_slice(att.natts as _) };
if atts.is_empty() {
pgrx::error!("indexing on no columns is not supported");
Expand All @@ -116,8 +116,7 @@ pub unsafe fn options(index_relation: pgrx::pg_sys::Relation) -> IndexOptions {
// get v, d
let (v, d) = convert_opfamily_to_vd(opfamily).unwrap();
// get segment, optimizing, indexing
let (segment, optimizing, indexing) =
unsafe { convert_varlena_to_soi((*index_relation).rd_options) };
let (segment, optimizing, indexing) = unsafe { convert_varlena_to_soi((*index).rd_options) };
IndexOptions {
vector: VectorOptions { dims, v, d },
segment,
Expand Down

0 comments on commit ade4603

Please sign in to comment.