Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition if UNLOGGED relation is first-accessed concurrently #435

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 99 additions & 36 deletions src/index/am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,77 +132,138 @@ pub unsafe extern "C" fn amcostestimate(

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambuild(
heap_relation: pgrx::pg_sys::Relation,
index_relation: pgrx::pg_sys::Relation,
heap: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
index_info: *mut pgrx::pg_sys::IndexInfo,
) -> *mut pgrx::pg_sys::IndexBuildResult {
initialize_meta(index, pgrx::pg_sys::ForkNumber_MAIN_FORKNUM);
make_well_formed(index);
let result = pgrx::PgBox::<pgrx::pg_sys::IndexBuildResult>::alloc0();
am_build::build(
index_relation,
Some((heap_relation, index_info, result.as_ptr())),
);
make_well_formed(index_relation);
am_build::build_insertions(index, heap, index_info, result.as_ptr());
result.into_pg()
}

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambuildempty(_index: pgrx::pg_sys::Relation) {}
pub unsafe extern "C" fn ambuildempty(index: pgrx::pg_sys::Relation) {
initialize_meta(index, pgrx::pg_sys::ForkNumber_INIT_FORKNUM);
}

unsafe fn initialize_meta(index: pgrx::pg_sys::Relation, forknum: i32) {
unsafe {
let meta_buffer = scopeguard::guard(
{
let meta_buffer = pgrx::pg_sys::ReadBufferExtended(
index,
forknum,
0xFFFFFFFF,
pgrx::pg_sys::ReadBufferMode_RBM_NORMAL,
std::ptr::null_mut(),
);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_EXCLUSIVE as _);
meta_buffer
},
|meta_buffer| {
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
},
);
let xlog = pgrx::pg_sys::GenericXLogStart(index);
pgrx::pg_sys::GenericXLogRegisterBuffer(
xlog,
*meta_buffer,
pgrx::pg_sys::GENERIC_XLOG_FULL_IMAGE as _,
);
pgrx::pg_sys::GenericXLogFinish(xlog);
}
}

#[repr(C)]
struct VectorsPageOpaqueData {
_reserved: [u8; 2048],
version: u32,
_reserved: [u8; 2044],
}

const _: () = assert!(std::mem::size_of::<VectorsPageOpaqueData>() == 2048);

unsafe fn make_well_formed(index_relation: pgrx::pg_sys::Relation) {
unsafe fn make_well_formed(index: pgrx::pg_sys::Relation) {
unsafe {
let meta_buffer = pgrx::pg_sys::ReadBuffer(index_relation, 0xFFFFFFFF /* P_NEW */);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_EXCLUSIVE as _);
assert!(pgrx::pg_sys::BufferGetBlockNumber(meta_buffer) == 0);
let state = pgrx::pg_sys::GenericXLogStart(index_relation);
let meta_page = pgrx::pg_sys::GenericXLogRegisterBuffer(
state,
meta_buffer,
let meta_buffer = scopeguard::guard(
{
let meta_buffer = pgrx::pg_sys::ReadBuffer(index, 0);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_SHARE as _);
meta_buffer
},
|meta_buffer| {
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
},
);
let meta_page = pgrx::pg_sys::BufferGetPage(*meta_buffer);
let meta_header = meta_page.cast::<pgrx::pg_sys::PageHeaderData>();
let meta_offset = (*meta_header).pd_special as usize;
let meta_opaque = meta_page.add(meta_offset).cast::<VectorsPageOpaqueData>();
if (*meta_opaque).version != 0 {
return;
}
am_build::build(index);
let xlog = pgrx::pg_sys::GenericXLogStart(index);
let xlog_meta_page = pgrx::pg_sys::GenericXLogRegisterBuffer(
xlog,
*meta_buffer,
pgrx::pg_sys::GENERIC_XLOG_FULL_IMAGE as _,
);
pgrx::pg_sys::PageInit(
meta_page,
xlog_meta_page,
pgrx::pg_sys::BLCKSZ as usize,
std::mem::size_of::<VectorsPageOpaqueData>(),
);
pgrx::pg_sys::GenericXLogFinish(state);
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
let xlog_meta_header = xlog_meta_page.cast::<pgrx::pg_sys::PageHeaderData>();
let xlog_meta_offset = (*xlog_meta_header).pd_special as usize;
let xlog_meta_opaque = xlog_meta_page
.add(xlog_meta_offset)
.cast::<VectorsPageOpaqueData>();
(*xlog_meta_opaque).version = 1;
pgrx::pg_sys::GenericXLogFinish(xlog);
}
}

unsafe fn check_well_formed(index_relation: pgrx::pg_sys::Relation) {
if !test_well_formed(index_relation) {
am_build::build(index_relation, None);
make_well_formed(index_relation);
unsafe fn check_well_formed(index: pgrx::pg_sys::Relation) {
if !test_well_formed(index) {
make_well_formed(index);
}
}

unsafe fn test_well_formed(index_relation: pgrx::pg_sys::Relation) -> bool {
pgrx::pg_sys::RelationGetNumberOfBlocksInFork(
index_relation,
pgrx::pg_sys::ForkNumber_MAIN_FORKNUM,
) == 1
unsafe fn test_well_formed(index: pgrx::pg_sys::Relation) -> bool {
unsafe {
let meta_buffer = scopeguard::guard(
{
let meta_buffer = pgrx::pg_sys::ReadBuffer(index, 0);
pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_SHARE as _);
meta_buffer
},
|meta_buffer| {
pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer);
},
);
let meta_page = pgrx::pg_sys::BufferGetPage(*meta_buffer);
let meta_header = meta_page.cast::<pgrx::pg_sys::PageHeaderData>();
let meta_offset = (*meta_header).pd_special as usize;
let meta_opaque = meta_page.add(meta_offset).cast::<VectorsPageOpaqueData>();
(*meta_opaque).version == 1
}
}

#[pgrx::pg_guard]
pub unsafe extern "C" fn aminsert(
index_relation: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
values: *mut Datum,
is_null: *mut bool,
heap_tid: pgrx::pg_sys::ItemPointer,
_heap_relation: pgrx::pg_sys::Relation,
_heap: pgrx::pg_sys::Relation,
_check_unique: pgrx::pg_sys::IndexUniqueCheck,
_index_unchanged: bool,
_index_info: *mut pgrx::pg_sys::IndexInfo,
) -> bool {
check_well_formed(index_relation);
let oid = (*index_relation).rd_id;
check_well_formed(index);
let oid = (*index).rd_id;
let id = get_handle(oid);
let vector = from_datum(*values.add(0), *is_null.add(0));
if let Some(v) = vector {
Expand All @@ -213,14 +274,14 @@ pub unsafe extern "C" fn aminsert(

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambeginscan(
index_relation: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
n_keys: std::os::raw::c_int,
n_orderbys: std::os::raw::c_int,
) -> pgrx::pg_sys::IndexScanDesc {
check_well_formed(index_relation);
check_well_formed(index);
assert!(n_keys == 0);
assert!(n_orderbys == 1);
am_scan::make_scan(index_relation)
am_scan::make_scan(index)
}

#[pgrx::pg_guard]
Expand Down Expand Up @@ -261,6 +322,8 @@ pub unsafe extern "C" fn ambulkdelete(
) -> *mut pgrx::pg_sys::IndexBulkDeleteResult {
if !test_well_formed((*info).index) {
pgrx::warning!("The vector index is not initialized.");
let result = pgrx::PgBox::<pgrx::pg_sys::IndexBulkDeleteResult>::alloc0();
return result.into_pg();
}
let oid = (*(*info).index).rd_id;
let id = get_handle(oid);
Expand Down
42 changes: 20 additions & 22 deletions src/index/am_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@ use pgrx::pg_sys::{IndexBuildResult, IndexInfo, RelationData};

pub struct Builder {
pub rpc: ClientRpc,
pub heap_relation: *mut RelationData,
pub heap: *mut RelationData,
pub index_info: *mut IndexInfo,
pub result: *mut IndexBuildResult,
}

pub unsafe fn build(
index: pgrx::pg_sys::Relation,
data: Option<(*mut RelationData, *mut IndexInfo, *mut IndexBuildResult)>,
) {
pub unsafe fn build(index: pgrx::pg_sys::Relation) {
let oid = (*index).rd_id;
let id = get_handle(oid);
let options = options(index);
Expand All @@ -31,26 +28,27 @@ pub unsafe fn build(
}
}
super::hook_maintain::maintain_index_in_index_create(id);
if let Some((heap_relation, index_info, result)) = data {
let mut builder = Builder {
rpc,
heap_relation,
index_info,
result,
};
pgrx::pg_sys::IndexBuildHeapScan(
heap_relation,
index,
index_info,
Some(callback),
&mut builder,
);
}
}

pub unsafe fn build_insertions(
index: pgrx::pg_sys::Relation,
heap: *mut RelationData,
index_info: *mut IndexInfo,
result: *mut IndexBuildResult,
) {
let rpc = check_client(crate::ipc::client());
let mut builder = Builder {
rpc,
heap,
index_info,
result,
};
pgrx::pg_sys::IndexBuildHeapScan(heap, index, index_info, Some(callback), &mut builder);
}

#[pgrx::pg_guard]
unsafe extern "C" fn callback(
index_relation: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
ctid: pgrx::pg_sys::ItemPointer,
values: *mut pgrx::pg_sys::Datum,
is_null: *mut bool,
Expand All @@ -62,7 +60,7 @@ unsafe extern "C" fn callback(
(*state.result).heap_tuples += 1.0;
return;
}
let oid = (*index_relation).rd_id;
let oid = (*index).rd_id;
let id = get_handle(oid);
let vector = from_datum(*values.add(0), *is_null.add(0));
let vector = match vector {
Expand Down
4 changes: 2 additions & 2 deletions src/index/am_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub enum Scanner {
Empty {},
}

pub unsafe fn make_scan(index_relation: pgrx::pg_sys::Relation) -> pgrx::pg_sys::IndexScanDesc {
pub unsafe fn make_scan(index: pgrx::pg_sys::Relation) -> pgrx::pg_sys::IndexScanDesc {
use pgrx::PgMemoryContexts;

let scan = pgrx::pg_sys::RelationGetIndexScan(index_relation, 0, 1);
let scan = pgrx::pg_sys::RelationGetIndexScan(index, 0, 1);

(*scan).xs_recheck = false;
(*scan).xs_recheckorderby = false;
Expand Down
9 changes: 4 additions & 5 deletions src/index/am_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ unsafe fn convert_varlena_to_soi(
}
}

pub unsafe fn options(index_relation: pgrx::pg_sys::Relation) -> IndexOptions {
let opfamily = unsafe { (*index_relation).rd_opfamily.read() };
let att = unsafe { &mut *(*index_relation).rd_att };
pub unsafe fn options(index: pgrx::pg_sys::Relation) -> IndexOptions {
let opfamily = unsafe { (*index).rd_opfamily.read() };
let att = unsafe { &mut *(*index).rd_att };
let atts = unsafe { att.attrs.as_slice(att.natts as _) };
if atts.is_empty() {
pgrx::error!("indexing on no columns is not supported");
Expand All @@ -116,8 +116,7 @@ pub unsafe fn options(index_relation: pgrx::pg_sys::Relation) -> IndexOptions {
// get v, d
let (v, d) = convert_opfamily_to_vd(opfamily).unwrap();
// get segment, optimizing, indexing
let (segment, optimizing, indexing) =
unsafe { convert_varlena_to_soi((*index_relation).rd_options) };
let (segment, optimizing, indexing) = unsafe { convert_varlena_to_soi((*index).rd_options) };
IndexOptions {
vector: VectorOptions { dims, v, d },
segment,
Expand Down
Loading