Skip to content

Commit

Permalink
0.2.0 (#9)
Browse files Browse the repository at this point in the history
* explicitly specify hasher in default HashMap impl (#3)

as per clippy's implicit hasher lint

* set aHash as default hasher, add DefaultBuildHasher type (#4)

* add a public DefaultHashBuilder type (aHash)

* hashbrown switched from Fx Hash to aHash, already a good indicator
* aHash provides better performance than Fx Hash on strings when running
  on x86 processors, which is the exact use case I am targeting
* having a DefaultHashBuilder type allows users to be slightly more
  insulated to changes in the hashing algorithm in the future

* clean up benchmarks

* apply some clippy lints
* change mutex benchmark so it compiles (use aHash instead of Fx Hash)

* bump version requirements, add num_cpus dependency

* set number of threads equivalent to hw core count

* clean up incorrect automerge (Fx Hash/aHash)

* relax memory orderings (#5)

* relax atomic memory orderings

these are almost certainly wrong

* use acquire/acquire cas semantics when swinging

this is to ensure that all writes to the returned bucket array are seen
it would be better if we had consume loads but that's not possible
unfortunately

* add read-modify-write functions (#8)

* simplify * implementations to use *_and functions

look at all that code we deleted. beautiful

* write and document public interface for RMW ops

* insert_or_modify - insert a new value or mutate an existing one
* insert_with_or_modfiy - insert_or_modify using a function to get the
  value to insert
* insert_or_modify_and - insert_or_modify that returns the result of
  invoking a function on the previous value
* insert_with_or_modify_and - insert_with_or_modify that returns the
  result of invoking a function on the previous value
* modify - mutate an existing value
* modify_and - modify that returns the result of invoking a function on
  the previous value

* fix README.md so it won't fail assertions

d'oh

* refactor insert_or_modify

use insert_or_modify_and instead of insert_with_or_modify_and because
it's simpler to understand and less code

* use usize::next_power_of_two

kind of dumb to roll my own, but lessons learned and all that

* normalize order of trait bounds in generics

* fix a use-after-free/double-free bug with buckets

it was previously possible for buckets to be involved in use-after-free
and double-free bugs. say some bucket is shared between three bucket
arrays, and that bucket is then removed from the latest array. if all
threads have a copy of the first bucket array, then when they try and
traverse the bucket array linked list they will only get to the second
bucket array, which still contains a pointer to the now-deleted bucket.
despite being marked as redirect, we still have to read the bucket's key
to determine if we need to walk the list. so subsequent threads would be
reading this memory after it was freed.

the fix is to keep an epoch count in each bucket array that is
incremented each time the buckets grow, and to return the bucket array
that was used to fulfill a get/insert/remove operation. when we walk the
bucket array list, we require that the final bucket array has an epoch
at least as high as the returned bucket array. it is supremely unlikely
that this epoch tag will overflow, since bucket arrays double
in size when they grow. so the hash map's bucket array after swinging
will be guaranteed to be up-to-date from the point of view of the
current thread, meaning there is no way for subsequent threads to read
pointers to now-freed buckets.

this commit also contains an improvement to HashMap::drop that removes
its reliance on a std::containers::HashSet<usize>. we know that all
buckets with a redirect tag are either duplicated in the next bucket
array or have been removed and are waiting to be destroyed when the GC
runs. so the way to know if a bucket should be deallocated in
HashMap::drop is to see if the bucket pointer has the redirect tag set;
if it is set, we can defer freeing it until later. otherwise we are ok
to free it immediately, since it won't be present in subsequent bucket
arrays and of course won't be visible in other threads (guarantees of
&mut).

finally, BucketArray::grow_into has been improved to check for a
supplanted flag while iterating and to set that supplanted flag when it
finishes operating. threads that encounter the supplanted flag can
simply skip the grow_into operation, since it has already been completed
by another thread.

* implement HashMap::modify{,_and}

this implementation seems pretty efficient; it doesn't make any
duplicate checks or allocations inside the CAS loop.

* remove K: Clone bound for insert_or_modify

* implement insert_or_modify, but a bug is exposed

* BucketArray::grow_into has a double-free/use-after-free bug

* keep hammering on it

* still no progress on the use-after-free/double-free bug in grow_into
* add some more tests to cover more overlapping operations
* refactor some code to "simplify it" according to my twisted desires

* bump verions of dependencies

* bump version to 0.2.0
  • Loading branch information
Gregory-Meyer authored Feb 28, 2020
1 parent c0266d9 commit fdf573b
Show file tree
Hide file tree
Showing 6 changed files with 1,151 additions and 343 deletions.
15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cht"
version = "0.1.2"
version = "0.2.0"
authors = ["Gregory Meyer <[email protected]>"]
edition = "2018"
description = "Lockfree resizeable concurrent hash table."
Expand All @@ -9,14 +9,15 @@ readme = "README.md"
license = "MIT"

[dependencies]
crossbeam-epoch = "0.7"
fxhash = "0.2"
ahash = "^0.3.2"
crossbeam-epoch = "^0.8.2"

[dev-dependencies]
criterion = "0.2"
hashbrown = "0.5"
lock_api = "0.2"
parking_lot = "0.8"
criterion = "^0.3.1"
hashbrown = "^0.7.0"
lock_api = "^0.3.3"
num_cpus = "^1.12.0"
parking_lot = "^0.10.0"

[[bench]]
name = "cht"
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ and deletions.
In your `Cargo.toml`:

```toml
cht = "0.1"
cht = "^0.2.0"
```

Then in your code:
Expand All @@ -29,7 +29,9 @@ let threads: Vec<_> = (0..16)
let map = map.clone();

thread::spawn(move || {
for j in 0..64 {
const NUM_INSERTIONS: usize = 64;

for j in (i * NUM_INSERTIONS)..((i + 1) * NUM_INSERTIONS) {
map.insert_and(j, j, |prev| assert_eq!(prev, None));
}
})
Expand Down
12 changes: 6 additions & 6 deletions benches/cht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ fn bench_single_thread_insertion(c: &mut Criterion) {

b.iter(|| map.insert(criterion::black_box(numel + 1), numel + 1))
},
[8, 64, 512, 4096, 32768].into_iter(),
[8, 64, 512, 4096, 32768].iter(),
);
}

fn bench_multi_thread_insertion(c: &mut Criterion) {
const NUM_THREADS: usize = 64;
let num_threads = num_cpus::get();

let map = Arc::new(HashMap::new());
let keep_going = Arc::new(AtomicBool::new(true));

let threads: Vec<_> = (0..NUM_THREADS - 1)
let threads: Vec<_> = (0..num_threads - 1)
.map(|i| {
let map = map.clone();
let keep_going = keep_going.clone();
Expand All @@ -46,7 +46,7 @@ fn bench_multi_thread_insertion(c: &mut Criterion) {
.collect();

c.bench_function("cht: multithreaded insertion", move |b| {
b.iter(|| map.insert(criterion::black_box(NUM_THREADS + 1), NUM_THREADS + 1))
b.iter(|| map.insert(criterion::black_box(num_threads + 1), num_threads + 1))
});

keep_going.store(false, Ordering::SeqCst);
Expand All @@ -55,12 +55,12 @@ fn bench_multi_thread_insertion(c: &mut Criterion) {
}

fn bench_multi_thread_contended_insertion(c: &mut Criterion) {
const NUM_THREADS: usize = 64;
let num_threads = num_cpus::get();

let map = Arc::new(HashMap::new());
let keep_going = Arc::new(AtomicBool::new(true));

let threads: Vec<_> = (0..NUM_THREADS - 1)
let threads: Vec<_> = (0..num_threads - 1)
.map(|_| {
let map = map.clone();
let keep_going = keep_going.clone();
Expand Down
42 changes: 20 additions & 22 deletions benches/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
borrow::Borrow,
hash::{BuildHasher, Hash},
mem,
sync::{
Expand All @@ -9,19 +8,19 @@ use std::{
thread,
};

use ahash::RandomState;
use criterion::{criterion_group, criterion_main, Criterion};
use fxhash::FxBuildHasher;
use hashbrown::{hash_map::Entry, HashMap};
use parking_lot::RwLock;

struct ConcurrentHashMap<K: Hash + Eq, V, S: BuildHasher> {
map: RwLock<HashMap<K, Arc<RwLock<V>>, S>>,
}

impl<K: Hash + Eq, V> ConcurrentHashMap<K, V, FxBuildHasher> {
fn new() -> ConcurrentHashMap<K, V, FxBuildHasher> {
impl<K: Hash + Eq, V> ConcurrentHashMap<K, V, RandomState> {
fn new() -> ConcurrentHashMap<K, V, RandomState> {
ConcurrentHashMap {
map: RwLock::new(HashMap::with_hasher(FxBuildHasher::default())),
map: RwLock::new(HashMap::with_hasher(RandomState::default())),
}
}
}
Expand Down Expand Up @@ -55,32 +54,31 @@ impl<K: Hash + Eq, V, S: BuildHasher> ConcurrentHashMap<K, V, S> {
}
}
}

fn get<Q: Hash + Eq + ?Sized>(&self, key: &Q) -> Option<Arc<RwLock<V>>>
where
K: Borrow<Q>,
{
let guard = self.map.read();
guard.get(key).cloned()
}
}

fn bench_single_thread_insertion(c: &mut Criterion) {
let map = ConcurrentHashMap::new();

c.bench_function(
c.bench_function_over_inputs(
"hashbrown/parking_lot: single threaded insertion",
move |b| b.iter(|| map.insert(criterion::black_box(5), 5)),
|b, &&numel| {
let map = ConcurrentHashMap::new();

for i in 0..numel {
map.insert(i, i);
}

b.iter(|| map.insert(criterion::black_box(numel + 1), numel + 1))
},
[8, 64, 512, 4096, 32768].iter(),
);
}

fn bench_multi_thread_insertion(c: &mut Criterion) {
const NUM_THREADS: usize = 64;
let num_threads = num_cpus::get();

let map = Arc::new(ConcurrentHashMap::new());
let keep_going = Arc::new(AtomicBool::new(true));

let threads: Vec<_> = (0..NUM_THREADS - 1)
let threads: Vec<_> = (0..num_threads - 1)
.map(|i| {
let map = map.clone();
let keep_going = keep_going.clone();
Expand All @@ -95,7 +93,7 @@ fn bench_multi_thread_insertion(c: &mut Criterion) {

c.bench_function("hashbrown/parking_lot: multithreaded insertion", move |b| {
b.iter(|| {
map.insert(criterion::black_box(NUM_THREADS + 1), NUM_THREADS + 1);
map.insert(criterion::black_box(num_threads + 1), num_threads + 1);
})
});

Expand All @@ -105,12 +103,12 @@ fn bench_multi_thread_insertion(c: &mut Criterion) {
}

fn bench_multi_thread_contended_insertion(c: &mut Criterion) {
const NUM_THREADS: usize = 64;
let num_threads = num_cpus::get();

let map = Arc::new(ConcurrentHashMap::new());
let keep_going = Arc::new(AtomicBool::new(true));

let threads: Vec<_> = (0..NUM_THREADS - 1)
let threads: Vec<_> = (0..num_threads - 1)
.map(|_| {
let map = map.clone();
let keep_going = keep_going.clone();
Expand Down
Loading

0 comments on commit fdf573b

Please sign in to comment.