Skip to content

Commit

Permalink
ensure drop() runs for all Bucket and BucketArray objects (#2)
Browse files Browse the repository at this point in the history
* run destructors for buckets and bucket arrays

turns out crossbeam doesn't do this for you, the things you find out when you
rtfm...

* remove set module

it's not public, so we can just recover it from before this commit if necessary

* bump version to 0.1.2
  • Loading branch information
Gregory-Meyer authored Jun 26, 2019
1 parent 68ae808 commit c0266d9
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 176 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cht"
version = "0.1.1"
version = "0.1.2"
authors = ["Gregory Meyer <[email protected]>"]
edition = "2018"
description = "Lockfree resizeable concurrent hash table."
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
//! [Junction]: https://github.com/preshing/junction
pub mod map;
mod set; // it's not ready yet

pub use map::HashMap;

Expand Down
166 changes: 94 additions & 72 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use std::{
borrow::Borrow,
collections::HashSet,
hash::{BuildHasher, Hash, Hasher},
mem,
sync::{
Expand All @@ -35,7 +36,7 @@ use std::{
},
};

use crossbeam_epoch::{self, Atomic, Guard, Owned, Shared};
use crossbeam_epoch::{self, Atomic, Guard, Owned, Pointer, Shared};
use fxhash::FxBuildHasher;

/// A lockfree concurrent hash map implemented with open addressing and linear
Expand Down Expand Up @@ -486,15 +487,10 @@ impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap<K, V, S> {
}

let buckets = unsafe { buckets_ptr.deref() };
let BucketAndParentPtr {
bucket_ptr: found_bucket_ptr,
parent_ptr: new_buckets_ptr,
} = buckets.get(key, hash, guard);

if !new_buckets_ptr.is_null() {
self.buckets
.compare_and_set(buckets_ptr, new_buckets_ptr, Ordering::SeqCst, guard)
.ok();
let (found_bucket_ptr, redirected) = buckets.get(key, hash, guard);

if redirected {
self.try_swing_bucket_array_ptr(buckets_ptr, guard);
}

unsafe { found_bucket_ptr.as_ref() }
Expand All @@ -513,10 +509,7 @@ impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap<K, V, S> {
})
.into_shared(guard);

let BucketAndParentPtr {
bucket_ptr: previous_bucket_ptr,
parent_ptr: new_buckets_ptr,
} = buckets.insert(new_bucket, hash, guard);
let (previous_bucket_ptr, redirected) = buckets.insert(new_bucket, hash, guard);

if let Some(previous_bucket) = unsafe { previous_bucket_ptr.as_ref() } {
if previous_bucket.maybe_value.is_none() {
Expand All @@ -526,10 +519,12 @@ impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap<K, V, S> {
self.len.fetch_add(1, Ordering::SeqCst);
}

if !new_buckets_ptr.is_null() {
self.buckets
.compare_and_set(buckets_ptr, new_buckets_ptr, Ordering::SeqCst, guard)
.ok();
if !previous_bucket_ptr.is_null() {
unsafe { guard.defer_destroy(previous_bucket_ptr) };
}

if redirected {
self.try_swing_bucket_array_ptr(buckets_ptr, guard);
}

unsafe { previous_bucket_ptr.as_ref() }
Expand All @@ -552,11 +547,16 @@ impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap<K, V, S> {
let buckets_ref = unsafe { buckets_ptr.deref() };
let hash = self.get_hash(key);

unsafe { buckets_ref.remove(key, hash, None, guard).as_ref() }.map(|b| {
let removed_ptr = buckets_ref.remove(key, hash, None, guard);

if !removed_ptr.is_null() {
unsafe { guard.defer_destroy(removed_ptr) };
self.len.fetch_sub(1, Ordering::SeqCst);

b
})
Some(unsafe { removed_ptr.deref() })
} else {
None
}
}

fn get_hash<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> u64 {
Expand All @@ -583,7 +583,7 @@ impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap<K, V, S> {
};

match self.buckets.compare_and_set_weak(
buckets_ptr,
Shared::null(),
new_buckets,
Ordering::SeqCst,
guard,
Expand All @@ -599,6 +599,62 @@ impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap<K, V, S> {
}
}
}

fn try_swing_bucket_array_ptr(
&self,
current_ptr: Shared<'g, BucketArray<K, V, S>>,
guard: &'g Guard,
) {
assert!(!current_ptr.is_null());

let current = unsafe { current_ptr.deref() };
let next_array_ptr = current.next_array.load(Ordering::SeqCst, guard);
assert!(!next_array_ptr.is_null());

if self
.buckets
.compare_and_set_weak(current_ptr, next_array_ptr, Ordering::SeqCst, guard)
.is_ok()
{
unsafe {
guard.defer_destroy(current_ptr);
}
}
}
}

impl<K: Eq + Hash, V, S: BuildHasher> Drop for HashMap<K, V, S> {
fn drop(&mut self) {
let guard = unsafe { crossbeam_epoch::unprotected() };

let mut buckets_ptr = self.buckets.swap(Shared::null(), Ordering::SeqCst, guard);
let mut freed_buckets = HashSet::with_hasher(FxBuildHasher::default());

while !buckets_ptr.is_null() {
let this_bucket_array = unsafe { buckets_ptr.deref() };
let new_buckets_ptr =
this_bucket_array
.next_array
.swap(Shared::null(), Ordering::SeqCst, guard);

for this_bucket in this_bucket_array.buckets.iter() {
let this_bucket_ptr = this_bucket
.swap(Shared::null(), Ordering::SeqCst, guard)
.with_tag(0);

if this_bucket_ptr.is_null() {
continue;
}

if freed_buckets.insert(this_bucket_ptr.into_usize()) {
mem::drop(unsafe { this_bucket_ptr.into_owned() });
}
}

mem::drop(unsafe { buckets_ptr.into_owned() });
buckets_ptr = new_buckets_ptr;
}
}
}

struct BucketArray<K: Hash + Eq, V, S: BuildHasher> {
Expand Down Expand Up @@ -634,7 +690,7 @@ impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketArray<K, V, S> {
key: &Q,
hash: u64,
guard: &'g Guard,
) -> BucketAndParentPtr<'g, K, V, S>
) -> (Shared<'g, Bucket<K, V>>, bool)
where
K: Borrow<Q>,
{
Expand All @@ -650,44 +706,46 @@ impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketArray<K, V, S> {
if this_bucket_ref.key.borrow() != key {
continue;
} else if this_bucket_ptr.tag() != REDIRECT_TAG {
return BucketAndParentPtr::without_parent(this_bucket_ptr);
return (this_bucket_ptr, false);
}

let next_array_ptr = self.next_array.load(Ordering::SeqCst, guard);
assert!(!next_array_ptr.is_null());
let next_array = unsafe { next_array_ptr.deref() };
self.grow_into(next_array, guard);

return next_array
.get(key, hash, guard)
.parent_ptr_or(next_array_ptr);
let mut result = next_array.get(key, hash, guard);
result.1 = true;

return result;
} else {
return BucketAndParentPtr::null();
return (Shared::null(), false);
}
}

BucketAndParentPtr::null()
(Shared::null(), false)
}

fn insert(
&self,
bucket_ptr: Shared<'g, Bucket<K, V>>,
hash: u64,
guard: &'g Guard,
) -> BucketAndParentPtr<'g, K, V, S> {
) -> (Shared<'g, Bucket<K, V>>, bool) {
assert!(!bucket_ptr.is_null());

let bucket = unsafe { bucket_ptr.deref() };
let capacity = self.buckets.len();
let len = self.len.load(Ordering::SeqCst);

let insert_into = |next_array_ptr: Shared<'g, BucketArray<K, V, S>>| {
let insert_into = |next_array_ptr: Shared<'_, BucketArray<K, V, S>>| {
assert!(!next_array_ptr.is_null());
let next_array = unsafe { next_array_ptr.deref() };

next_array
.insert(bucket_ptr, hash, guard)
.parent_ptr_or(next_array_ptr)
let mut result = next_array.insert(bucket_ptr, hash, guard);
result.1 = true;

result
};

// grow if inserting would push us over a load factor of 0.5
Expand Down Expand Up @@ -747,12 +805,9 @@ impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketArray<K, V, S> {
if should_increment_len {
// replaced a tombstone
self.len.fetch_add(1, Ordering::SeqCst);

return BucketAndParentPtr::null();
} else {
// swapped with something
return BucketAndParentPtr::without_parent(this_bucket_ptr);
}

return (this_bucket_ptr, false);
}
Err(e) => this_bucket_ptr = e.current,
}
Expand Down Expand Up @@ -931,39 +986,6 @@ impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketArray<K, V, S> {
}
}

#[derive(Copy, Clone)]
struct BucketAndParentPtr<'g, K: Hash + Eq, V, S: BuildHasher> {
bucket_ptr: Shared<'g, Bucket<K, V>>,
parent_ptr: Shared<'g, BucketArray<K, V, S>>,
}

impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketAndParentPtr<'g, K, V, S> {
fn without_parent(bucket_ptr: Shared<'g, Bucket<K, V>>) -> BucketAndParentPtr<'g, K, V, S> {
BucketAndParentPtr {
bucket_ptr,
parent_ptr: Shared::null(),
}
}

fn null() -> BucketAndParentPtr<'g, K, V, S> {
BucketAndParentPtr {
bucket_ptr: Shared::null(),
parent_ptr: Shared::null(),
}
}

fn parent_ptr_or(
mut self,
parent_ptr: Shared<'g, BucketArray<K, V, S>>,
) -> BucketAndParentPtr<'g, K, V, S> {
if self.parent_ptr.is_null() {
self.parent_ptr = parent_ptr;
}

self
}
}

#[repr(align(2))]
struct Bucket<K: Hash + Eq, V> {
key: K,
Expand Down
102 changes: 0 additions & 102 deletions src/set.rs

This file was deleted.

0 comments on commit c0266d9

Please sign in to comment.