From 5db7382ca6d1b8938612a167a303cf90c0276ab5 Mon Sep 17 00:00:00 2001 From: "Gregory Meyer (gregjm)" Date: Thu, 5 Mar 2020 17:08:33 -0500 Subject: [PATCH] 0.3.0 (#13) * Fix double free bug, some interface redesign (#10) * fix double free, change insert_or_modify interface fixing the double free was surprisingly subtle, and involved a few small changes to the algorithm to bring it more in line with Cliff Click's 2007 lockfree hash table. first, it is no longer possible for multiple rehashing operations to be in flight at once. this is primarily for simplicity's sake, but Click claims it's faster this way. next, bucket pointers that have been copied from an old table are marked with a tag bit. this prevents writes on the old table from overwriting writes on the new table. finally, removal and no longer requires the key to be cloned. instead, the value is destroyed via MaybeUninit shenanigans and a tombstone bit is set on the bucket pointer. the benefit here is that we are able to remove elements without forcing a memory allocation on the user. the interface of the insert_or_modify family of functions was changed to require a function that has both key and value parameters. imo there was never a solid reason not to do that, but this allows arbitrary data to be accessed from the old key before disposing of it. additionally, the Bucket/BucketArray classes were completely rewritten and moved into a module of their own. i am more satisfied with the amount of code duplication now, but i believe there is more work to be done to unify the implementations of get, insert, and remove. in the future, i hope to use this separate module to create a striped hash table similar to Java 7's ConcurrentHashMap. this should significantly increase write performance. note that this commit includes an internal remove_if interface, which is intended to address #7. * move map tests into crate::map::tests * add bucket key/value destructor tests ensure that destructors are called for keys and values in a timely manner * correct growth policy to grow by a factor of 2 previous behavior would result in growing by a factor of 4 (!) * Add remove_if family of functions (#11) * add remove_if family of functions `remove_if` and friends accept a condition function, which is `impl FnMut(&K, &V) -> bool`. If the condition returns true immediately before attempting to CAS the bucket pointer, the bucket is swapped. Otherwise, `None` is returned. * rewrite drivers to reduce number of function calls this way the backtraces will have one less function call listed * document remove_if functions * bump version to 0.3.0 A few breaking changes were made with the `modify` and `insert_or_modify` functions, so the next release requires a minor version bump (since we're in pre-1.0.0 territory) --- Cargo.toml | 2 +- README.md | 2 +- src/lib.rs | 593 +------------- src/map.rs | 1774 ++++++++++++++++------------------------- src/map/bucket.rs | 798 ++++++++++++++++++ src/map/tests.rs | 903 +++++++++++++++++++++ src/map/tests/util.rs | 132 +++ 7 files changed, 2503 insertions(+), 1701 deletions(-) create mode 100644 src/map/bucket.rs create mode 100644 src/map/tests.rs create mode 100644 src/map/tests/util.rs diff --git a/Cargo.toml b/Cargo.toml index bb82f60..98263b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cht" -version = "0.2.0" +version = "0.3.0" authors = ["Gregory Meyer "] edition = "2018" description = "Lockfree resizeable concurrent hash table." diff --git a/README.md b/README.md index 1245ef6..f73ac32 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ and deletions. In your `Cargo.toml`: ```toml -cht = "^0.2.0" +cht = "^0.3.0" ``` Then in your code: diff --git a/src/lib.rs b/src/lib.rs index 9eeaa9a..a8a8925 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ // MIT License // -// Copyright (c) 2019 Gregory Meyer +// Copyright (c) 2020 Gregory Meyer // // Permission is hereby granted, free of charge, to any person // obtaining a copy of this software and associated documentation files @@ -34,594 +34,3 @@ pub mod map; pub use map::HashMap; - -#[cfg(test)] -mod tests { - use super::*; - - use std::{ - sync::{Arc, Barrier}, - thread::{self, JoinHandle}, - }; - - #[test] - fn hash_map_insertion() { - const MAX_VALUE: i32 = 512; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, i), None); - - assert!(!map.is_empty()); - assert_eq!(map.len(), (i + 1) as usize); - - for j in 0..=i { - assert_eq!(map.get(&j), Some(j)); - assert_eq!(map.insert(j, j), Some(j)); - } - - for k in i + 1..MAX_VALUE { - assert_eq!(map.get(&k), None); - } - } - } - - #[test] - fn hash_map_growth() { - const MAX_VALUE: i32 = 512; - - let map = HashMap::new(); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, i), None); - - assert!(!map.is_empty()); - assert_eq!(map.len(), (i + 1) as usize); - - for j in 0..=i { - assert_eq!(map.get(&j), Some(j)); - assert_eq!(map.insert(j, j), Some(j)); - } - - for k in i + 1..MAX_VALUE { - assert_eq!(map.get(&k), None); - } - } - } - - #[test] - fn hash_map_concurrent_insertion() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = Arc::new(HashMap::with_capacity(MAX_INSERTED_VALUE as usize)); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_concurrent_growth() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = Arc::new(HashMap::new()); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(|t| t.join()) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_removal() { - const MAX_VALUE: i32 = 512; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, i), None); - } - - for i in 0..MAX_VALUE { - assert_eq!(map.remove(&i), Some(i)); - } - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_concurrent_removal() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.insert(i, i), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.remove(&j), Some(j)); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(|t| t.join()) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), 0); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_concurrent_insertion_and_removal() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; - const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; - - let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.insert(i, i), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); - - let insert_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - let remove_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) - { - assert_eq!(map.remove(&j), Some(j)); - } - }) - }) - .collect(); - - for result in insert_threads - .into_iter() - .chain(remove_threads.into_iter()) - .map(|t| t.join()) - { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), INSERTED_MIDPOINT as usize); - - for i in 0..INSERTED_MIDPOINT { - assert_eq!(map.get(&i), Some(i)); - } - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_concurrent_growth_and_removal() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; - const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; - - let map = HashMap::with_capacity(INSERTED_MIDPOINT as usize); - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.insert(i, i), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); - - let insert_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - let remove_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) - { - assert_eq!(map.remove(&j), Some(j)); - } - }) - }) - .collect(); - - for result in insert_threads - .into_iter() - .chain(remove_threads.into_iter()) - .map(JoinHandle::join) - { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), INSERTED_MIDPOINT as usize); - - for i in 0..INSERTED_MIDPOINT { - assert_eq!(map.get(&i), Some(i)); - } - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_modify() { - let map = HashMap::new(); - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - assert_eq!(map.modify("foo", |x| x * 2), None); - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - map.insert("foo", 1); - assert_eq!(map.modify("foo", |x| x * 2), Some(1)); - - assert!(!map.is_empty()); - assert_eq!(map.len(), 1); - - map.remove("foo"); - assert_eq!(map.modify("foo", |x| x * 2), None); - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - } - - #[test] - fn hash_map_concurrent_modification() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - map.insert(i, i); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (i as i32 * MAX_VALUE)..((i as i32 + 1) * MAX_VALUE) { - assert_eq!(map.modify(&j, |x| x * 2), Some(j)); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), Some(i * 2)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_modification() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, 0), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for i in 0..MAX_VALUE { - assert!(map.modify(&i, |x| x + 1).is_some()); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); - } - } - - #[test] - fn hash_map_insert_or_modify() { - let map = HashMap::new(); - - assert_eq!(map.insert_or_modify("foo", 1, |x| x + 1), None); - assert_eq!(map.get("foo"), Some(1)); - - assert_eq!(map.insert_or_modify("foo", 1, |x| x + 1), Some(1)); - assert_eq!(map.get("foo"), Some(2)); - } - - #[test] - fn hash_map_concurrent_insert_or_modify() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = Arc::new(HashMap::new()); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - map.insert_or_modify(j, 1, |x| x + 1); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_insertion() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = Arc::new(HashMap::with_capacity(MAX_VALUE as usize)); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - map.insert(j, j); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_growth() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = Arc::new(HashMap::new()); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - map.insert(j, j); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_removal() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - map.insert(i, i); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - let prev_value = map.remove(&j); - - if let Some(v) = prev_value { - assert_eq!(v, j); - } - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), None); - } - } -} diff --git a/src/map.rs b/src/map.rs index 3e407fd..e0e7118 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,6 +1,6 @@ // MIT License // -// Copyright (c) 2019 Gregory Meyer +// Copyright (c) 2020 Gregory Meyer // // Permission is hereby granted, free of charge, to any person // obtaining a copy of this software and associated documentation files @@ -25,18 +25,21 @@ //! A lockfree concurrent hash map implemented with open addressing and linear //! probing. +mod bucket; + +#[cfg(test)] +mod tests; + +use bucket::{Bucket, BucketArray, InsertOrModifyState, KeyOrOwnedBucket}; + use std::{ borrow::Borrow, - hash::{BuildHasher, Hash, Hasher}, - mem::{self}, - sync::{ - atomic::{self, AtomicUsize, Ordering}, - Arc, - }, + hash::{BuildHasher, Hash}, + sync::atomic::{self, AtomicUsize, Ordering}, }; use ahash::RandomState; -use crossbeam_epoch::{self, Atomic, Guard, Owned, Shared}; +use crossbeam_epoch::{self, Atomic, CompareAndSetError, Guard, Owned, Shared}; /// Default hasher for `HashMap`. /// @@ -57,21 +60,18 @@ pub type DefaultHashBuilder = RandomState; /// The default hashing algorithm is [aHash], a hashing algorithm that is /// accelerated by the [AES-NI] instruction set on x86 proessors. aHash provides /// some resistance to DoS attacks, but will not provide the same level of -/// resistance as something like [`RandomState`] from the standard library. +/// resistance as something like [`RandomState`]. /// /// The hashing algorithm to be used can be chosen on a per-`HashMap` basis /// using the [`with_hasher`] and [`with_capacity_and_hasher`] methods. /// -/// Key types must implement [`Hash`] and [`Eq`]. Additionally, if you are going -/// to be removing elements from this `HashMap`, the key type must also -/// implement [`Clone`], as `HashMap` uses tombstones for deletion. Any -/// operations that return a value require the value type to implement -/// [`Clone`], as elements may be in use by other threads and as such cannot be -/// moved from. +/// Key types must implement [`Hash`] and [`Eq`]. Any operations that return a +/// key or value require the return types to implement [`Clone`], as elements +/// may be in use by other threads and as such cannot be moved from. /// /// `HashMap` is inspired by Jeff Phreshing's hash tables implemented in /// [Junction], described in [this blog post]. In short, `HashMap` supports -/// fully concurrent lookups, insertions, and removals. +/// fully concurrent lookups, insertions, removals, and updates. /// /// [aHash]: https://docs.rs/ahash /// [AES-NI]: https://en.wikipedia.org/wiki/AES_instruction_set @@ -85,17 +85,16 @@ pub type DefaultHashBuilder = RandomState; /// [this blog post]: https://preshing.com/20160222/a-resizable-concurrent-map/ #[derive(Default)] pub struct HashMap { - buckets: Atomic>, + bucket_array: Atomic>, + build_hasher: S, len: AtomicUsize, - hash_builder: Arc, } impl HashMap { /// Creates an empty `HashMap`. /// /// The hash map is created with a capacity of 0 and will not allocate any - /// space for elements until the first insertion. However, the hash builder - /// `S` will be allocated on the heap. + /// space for elements until the first insertion. pub fn new() -> HashMap { HashMap::with_capacity_and_hasher(0, DefaultHashBuilder::default()) } @@ -103,48 +102,36 @@ impl HashMap { /// Creates an empty `HashMap` with space for at least `capacity` elements /// without reallocating. /// - /// If `capacity == 0`, the hash map will not allocate any space for - /// elements, but it will allocate space for the hash builder. + /// If `capacity == 0`, no allocations will occur. pub fn with_capacity(capacity: usize) -> HashMap { HashMap::with_capacity_and_hasher(capacity, DefaultHashBuilder::default()) } } impl HashMap { - /// Creates an empty `HashMap` that will use `hash_builder` to hash keys. + /// Creates an empty `HashMap` that will use `build_hasher` to hash keys. /// /// The created map will have a capacity of 0 and as such will not have any - /// space for elements allocated until the first insertion. However, the - /// hash builder `S` will be allocated on the heap. - pub fn with_hasher(hash_builder: S) -> HashMap { - HashMap::with_capacity_and_hasher(0, hash_builder) + /// space for elements allocated until the first insertion. + pub fn with_hasher(build_hasher: S) -> HashMap { + HashMap::with_capacity_and_hasher(0, build_hasher) } /// Creates an empty `HashMap` that will hold at least `capacity` elements - /// without reallocating and that uses `hash_builder` to hash keys. - /// - /// If `capacity == 0`, the hash map will not allocate any space for - /// elements. However, the hash map will always allocate its hash builder - /// `S` on the heap. - pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> HashMap { - let hash_builder = Arc::new(hash_builder); - - if capacity == 0 { - HashMap { - buckets: Atomic::null(), - hash_builder, - len: AtomicUsize::new(0), - } + /// without reallocating and that uses `build_hasher` to hash keys. + /// + /// If `capacity == 0`, no allocations will occur. + pub fn with_capacity_and_hasher(capacity: usize, build_hasher: S) -> HashMap { + let bucket_array = if capacity == 0 { + Atomic::null() } else { - HashMap { - buckets: Atomic::new(BucketArray::with_capacity_hasher_and_epoch( - capacity + 1, - hash_builder.clone(), - 0, - )), - hash_builder, - len: AtomicUsize::new(0), - } + Atomic::new(BucketArray::with_capacity(0, capacity)) + }; + + Self { + bucket_array, + build_hasher, + len: AtomicUsize::new(0), } } @@ -166,35 +153,29 @@ impl HashMap { } /// Returns the number of elements this `HashMap` can hold without - /// reallocating. + /// reallocating a table. + /// + /// Note that all mutating operations, with the exception of removing + /// elements, incur at least one allocation for the associated bucket. /// - /// If invoked while this hash map is growing, it is possible for - /// [`len`](#method.len) to return a greater value than this function does. - /// This is because new elements are being inserted into the next array of - /// buckets, but the `HashMap`'s bucket pointer has not been swung up the - /// list yet. + /// If there are insertion operations in flight, it is possible that a + /// new, larger table has already been allocated. pub fn capacity(&self) -> usize { let guard = &crossbeam_epoch::pin(); - let buckets_ptr = self.buckets.load_consume(guard); - - if buckets_ptr.is_null() { - return 0; - } - let buckets_ref = unsafe { buckets_ptr.deref() }; + let bucket_array_ptr = self.bucket_array.load_consume(guard); - buckets_ref.buckets.len() / 2 + unsafe { bucket_array_ptr.as_ref() } + .map(BucketArray::capacity) + .unwrap_or(0) } /// Returns a copy of the value corresponding to `key`. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. In addition, `V` must implement [`Clone`], as - /// the value may be concurrently removed at any moment, so the best we can - /// do is return a copy of it. - /// - /// If your `V` does not implement [`Clone`], you will have to use - /// [`get_and`] instead. + /// *must* match that of `K`. `V` must implement [`Clone`], as the value may + /// be deleted at any moment; the best we can do is to clone them while we + /// know they exist. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html @@ -205,18 +186,15 @@ impl HashMap { K: Borrow, V: Clone, { - self.get_and(key, V::clone) + self.get_key_value_and(key, |_, v| v.clone()) } /// Returns a copy of the key and value corresponding to `key`. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. In addition, `K` and `V` must implement - /// [`Clone`], as the bucket may be concurrently removed at any moment, so - /// the best we can do is return a copy of it. - /// - /// If your `K` or `V` do not implement [`Clone`], you will have to use - /// [`get_key_value_and`] instead. + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as the + /// bucket may be concurrently removed at any time; the best we can do is to + /// clone them while we know they exist. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html @@ -230,10 +208,10 @@ impl HashMap { self.get_key_value_and(key, |k, v| (k.clone(), v.clone())) } - /// Invokes `func` with a reference to the value corresponding to `key`. + /// Invokes `with_value` with a reference to the value corresponding to `key`. /// - /// `func` will only be invoked if there is a value associated with `key` - /// contained within this hash map. + /// `with_value` will only be invoked if there is a value associated with + /// `key` contained within this hash map. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` /// *must* match that of `K`. @@ -243,18 +221,18 @@ impl HashMap { pub fn get_and T, T>( &self, key: &Q, - func: F, + with_value: F, ) -> Option where K: Borrow, { - self.get_key_value_and(key, move |_, v| func(v)) + self.get_key_value_and(key, move |_, v| with_value(v)) } - /// Invokes `func` with a reference to the key and value corresponding to - /// `key`. + /// Invokes `with_entry` with a reference to the key and value corresponding + /// to `key`. /// - /// `func` will only be invoked if there is a value associated with `key` + /// `with_entry` will only be invoked if there is a value associated with `key` /// contained within this hash map. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` @@ -265,30 +243,55 @@ impl HashMap { pub fn get_key_value_and T, T>( &self, key: &Q, - func: F, + with_entry: F, ) -> Option where K: Borrow, { let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, key); + + let result; - self.get_bucket(key, guard) - .and_then(move |b| match &b.maybe_value { - Some(v) => Some(func(&b.key, v)), - None => None, - }) + loop { + match bucket_array_ref + .get(guard, hash, key) + .map(|p| unsafe { p.as_ref() }) + { + Ok(Some(Bucket { + key, + maybe_value: value, + })) => { + result = Some(with_entry(key, unsafe { &*value.as_ptr() })); + + break; + } + Ok(None) => { + result = None; + + break; + } + Err(_) => { + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } + } + } + + self.swing_bucket_array(guard, current_ref, bucket_array_ref); + + result } - /// Inserts a key-value pair into the hash map, then returns a copy of the - /// previous value associated with `key`. + /// Inserts a key-value pair, then returns a copy of the value previously + /// associated with `key`. /// /// If the key was not previously present in this hash map, [`None`] is /// returned. /// - /// `V` must implement [`Clone`] for this function, as it is possible that - /// other threads may still hold references to the value previously - /// associated with `key`. As such, the associated value cannot be moved - /// from. + /// `V` must implement [`Clone`], as other threads may hold references to + /// the associated value. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html @@ -296,18 +299,16 @@ impl HashMap { where V: Clone, { - self.insert_and(key, value, V::clone) + self.insert_entry_and(key, value, |_, v| v.clone()) } - /// Inserts a key-value pair into the hash map, then returns a copy of the - /// previous key-value pair. + /// Inserts a key-value pair, then returns a copy of the previous entry. /// /// If the key was not previously present in this hash map, [`None`] is /// returned. /// - /// `K` and `V` must implement [`Clone`] for this function, as it is - /// possible that other threads may still hold references to the key-value - /// pair previously associated with `key`. + /// `K` and `V` must implement [`Clone`], as other threads may hold + /// references to the entry. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html @@ -319,194 +320,88 @@ impl HashMap { self.insert_entry_and(key, value, |k, v| (k.clone(), v.clone())) } - /// Inserts a key-value pair into the hash map, then invokes `func` with the - /// previously-associated value. - /// - /// If the key was not previously present in this hash map, [`None`] is - /// returned and `func` is not invoked. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_and T, T>(&self, key: K, value: V, func: F) -> Option { - self.insert_entry_and(key, value, move |_, v| func(v)) - } - - /// Inserts a key-value pair into the hash map, then invokes `func` with the - /// new key and previously-associated value. + /// Inserts a key-value pair, then invokes `with_previous_value` with the + /// value previously associated with `key`. /// /// If the key was not previously present in this hash map, [`None`] is - /// returned and `func` is not invoked. + /// returned and `with_previous_value` is not invoked. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_entry_and T, T>( + pub fn insert_and T, T>( &self, key: K, value: V, - func: F, + with_previous_value: F, ) -> Option { - let guard = &crossbeam_epoch::pin(); - - self.do_insert(key, value, guard).and_then(|bucket| { - bucket - .maybe_value - .as_ref() - .map(|previous_value| func(&bucket.key, previous_value)) - }) - } - - /// Insert the a new value if none is associated with `key` or replace the - /// value with the result of `on_modify`, then return a clone of the old - /// value. - /// - /// If there is no value associated with `key`, [`None`] will be returned - /// and `on_modify` will not be invoked. Otherwise, `on_modify` may be - /// invoked multiple times depending on how much write contention there is - /// on the bucket associated with `key`. - /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. - /// - /// `V` must implement [`Clone`] for this function, as it is possible that - /// other threads may still hold references to the value previously - /// associated with `key`. As such, the value previously associated with - /// `key` cannot be moved from. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html - pub fn insert_or_modify V>(&self, key: K, value: V, on_modify: F) -> Option - where - V: Clone, - { - self.insert_or_modify_and(key, value, on_modify, V::clone) - } - - /// Insert the result of `on_insert` if no value is associated with `key` or - /// replace the value with the result of `on_modify`, then return a clone of - /// the old value. - /// - /// If there is no value associated with `key`, `on_insert` will be invoked, - /// `on_modify` will not be invoked, and [`None`] will be returned. - /// Otherwise, `on_modify` may be invoked multiple times depending on how - /// much write contention there is on the bucket associate with `key`. - /// - /// It is possible for both `on_insert` and `on_modify` to be invoked, even - /// if [`None`] is returned, if other threads are also writing to the bucket - /// associated with `key`. - /// - /// `V` must implement [`Clone`] for this function, as it is possible that - /// other threads may still hold references to the value previously - /// associated with `key`. As such, the value previously associated with - /// `key` cannot be moved from. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html - pub fn insert_with_or_modify V, G: FnMut(&V) -> V>( - &self, - key: K, - on_insert: F, - on_modify: G, - ) -> Option - where - V: Clone, - { - self.insert_with_or_modify_and(key, on_insert, on_modify, V::clone) + self.insert_entry_and(key, value, move |_, v| with_previous_value(v)) } - /// Insert the a new value if none is associated with `key` or replace the - /// value with the result of `on_modify`, then return the result of - /// `with_old_value` using the old value. + /// Inserts a key-value pair, then invokes `with_previous_entry` with the + /// previous entry. /// - /// If there is no value associated with `key`, [`None`] will be returned - /// and `on_modify` and `with_old_value` will not be invoked. Otherwise, - /// `on_modify` may be invoked multiple times depending on how much write - /// contention there is on the bucket associated with `key`. - /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. + /// If the key was not previously present in this hash map, [`None`] is + /// returned and `with_previous_entry` is not invoked. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_or_modify_and V, G: FnOnce(&V) -> T, T>( + pub fn insert_entry_and T, T>( &self, key: K, value: V, - on_modify: F, - with_old_value: G, - ) -> Option { - self.insert_with_or_modify_and(key, move || value, on_modify, with_old_value) - } - - /// Insert the result of `on_insert` if no value is associated with `key` or - /// replace the value with the result of `on_modify`, then return the result - /// of `with_old_value` using the old value. - /// - /// If there is no value associated with `key`, `on_insert` will be invoked, - /// `on_modify` and `with_old_value`, will not be invoked, and [`None`] will - /// be returned. Otherwise, `on_modify` may be invoked multiple times - /// depending on how much write contention there is on the bucket associate - /// with `key`. - /// - /// It is possible for both `on_insert` and `on_modify` to be invoked, even - /// if [`None`] is returned, if other threads are also writing to the bucket - /// associated with `key`. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_with_or_modify_and V, G: FnMut(&V) -> V, H: FnOnce(&V) -> T, T>( - &self, - key: K, - on_insert: F, - on_modify: G, - with_old_value: H, + with_previous_entry: F, ) -> Option { let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); + let mut bucket_ptr = Owned::new(Bucket::new(key, value)); - let hash = self.get_hash(&key); + let result; - let buckets_ptr = self.get_or_create_buckets(guard); - assert!(!buckets_ptr.is_null()); + loop { + while self.len.load(Ordering::Relaxed) > bucket_array_ref.capacity() { + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } - let buckets = unsafe { buckets_ptr.deref() }; + match bucket_array_ref.insert(guard, hash, bucket_ptr) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 { + self.len.fetch_add(1, Ordering::Relaxed); + result = None; + } else { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() })); + } - let BucketAndParent { - bucket: previous_bucket_ptr, - parent: new_buckets_ptr, - } = buckets.insert_or_modify( - KeyOrBucket::Key(key), - hash, - FunctionOrValue::Function(on_insert), - on_modify, - guard, - ); + unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) }; + } else { + self.len.fetch_add(1, Ordering::Relaxed); + result = None; + } - if new_buckets_ptr != buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); + break; + } + Err(p) => { + bucket_ptr = p; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } + } } - if !previous_bucket_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(previous_bucket_ptr.into_owned()); - }) - }; - } else { - self.len.fetch_add(1, Ordering::Relaxed); - } + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - unsafe { previous_bucket_ptr.as_ref() } - .and_then(move |b| b.maybe_value.as_ref().map(with_old_value)) + result } -} -impl HashMap { - /// Removes the value associated with `key` from the hash map, returning a - /// copy of that value if there was one contained in this hash map. + /// If there is a value associated with `key`, remove and return a copy of + /// it. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` and `V` must implement [`Clone`] for this - /// function, as `K` must be cloned for the tombstone bucket and the - /// previously-associated value cannot be moved from, as other threads - /// may still hold references to it. + /// *must* match that of `K`. `V` must implement [`Clone`], as other + /// threads may hold references to the associated value. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html @@ -516,1019 +411,684 @@ impl HashMap { K: Borrow, V: Clone, { - self.remove_and(key, V::clone) + self.remove_entry_if_and(key, |_, _| true, |_, v| v.clone()) } - /// Removes the value associated with `key` from the hash map, returning a - /// copy of that key-value pair it was contained in this hash map. + /// If there is a value associated with `key`, remove it and return a copy + /// of the previous entity. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` and `V` must implement [`Clone`] for this - /// function. `K` must be cloned twice: once for the tombstone bucket - /// and once for the return value; the previously-associated value cannot be - /// moved from, as other threads may still hold references to it. + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html pub fn remove_entry(&self, key: &Q) -> Option<(K, V)> where - K: Borrow, + K: Borrow + Clone, V: Clone, { - self.remove_entry_and(key, |k, v| (k.clone(), v.clone())) + self.remove_entry_if_and(key, |_, _| true, |k, v| (k.clone(), v.clone())) } - /// Removes the value associated with `key` from the hash map, then returns - /// the result of invoking `func` with the previously-associated value. + /// If there is a value associated with `key`, remove it and return the + /// result of invoking `with_previous_value` with that value. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` must implement [`Clone`] for this - /// function, as `K` must be cloned to create a tombstone bucket. + /// *must* match that of `K`. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html pub fn remove_and T, T>( &self, key: &Q, - func: F, + with_previous_value: F, ) -> Option where K: Borrow, { - self.remove_entry_and(key, move |_, v| func(v)) + self.remove_entry_if_and(key, |_, _| true, move |_, v| with_previous_value(v)) } - /// Removes the value associated with `key` from the hash map, then returns - /// the result of invoking `func` with the key and previously-associated - /// value. + /// If there is a value associated with `key`, remove it and return the + /// result of invoking `with_previous_entry` with that entry. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` must implement [`Clone`] for this - /// function, as `K` must be cloned to create a tombstone bucket. + /// *must* match that of `K`. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html pub fn remove_entry_and T, T>( &self, key: &Q, - func: F, + with_previous_entry: F, ) -> Option where K: Borrow, { - let guard = &crossbeam_epoch::pin(); - - self.do_remove(key, guard) - .and_then(|bucket| bucket.maybe_value.as_ref().map(|v| func(&bucket.key, v))) + self.remove_entry_if_and(key, |_, _| true, with_previous_entry) } - /// Replace the value associated with `key` with the result of `on_modify` - /// and return a clone of the previous value. - /// - /// If there is no value associated with `key`, `on_modify` will not be - /// invoked and [`None`] will be returned. Otherwise, `on_modify` may be - /// invoked multiple times depending on how much write contention there is - /// on the bucket associate with `key`. + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove and return a copy of its + /// value. /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. + /// `condition` may be invoked one or more times, even if no entry was + /// removed. /// - /// `K` must implement [`Clone`] for this function to create a new bucket - /// if one already exists. `V` must implement [`Clone`] for this function, - /// as it is possible that other threads may still hold references to the - /// value previously associated with `key`. As such, the value previously - /// associated with `key` cannot be moved from. `Q` can be any borrowed form - /// of `K`, but [`Hash`] and [`Eq`] on `Q` *must* match that of `K`. + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - pub fn modify V>( + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn remove_if bool>( &self, key: &Q, - on_modify: F, + condition: F, ) -> Option where K: Borrow, V: Clone, { - self.modify_and(key, on_modify, V::clone) + self.remove_entry_if_and(key, condition, move |_, v| v.clone()) } - /// Replace the value associated with `key` with the result of `on_modify` - /// and return the result of invoking `with_old_value` on the old value. + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove and return a copy of it. /// - /// If there is no value associated with `key`, `on_modify` and - /// `with_old_value` will not be invoked and [`None`] will be returned. - /// Otherwise, `on_modify` may be invoked multiple times depending on how - /// much write contention there is on the bucket associate with `key`. + /// `condition` may be invoked one or more times, even if no entry was + /// removed. /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. - /// - /// `K` must implement [`Clone`] for this function to create a new bucket - /// if one already exists. `Q` can be any borrowed form of `K`, but - /// [`Hash`] and [`Eq`] on `Q` *must* match that of `K`. + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn remove_entry_if bool>( + &self, + key: &Q, + condition: F, + ) -> Option<(K, V)> + where + K: Clone + Borrow, + V: Clone, + { + self.remove_entry_if_and(key, condition, move |k, v| (k.clone(), v.clone())) + } + + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove it and return the result of + /// invoking `with_previous_value` with its value. + /// + /// `condition` may be invoked one or more times, even if no entry was + /// removed. If `condition` failed or there was no value associated with + /// `key`, `with_previous_entry` is not invoked and [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - pub fn modify_and V, G: FnOnce(&V) -> T, T>( + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn remove_if_and bool, G: FnOnce(&V) -> T, T>( &self, key: &Q, - on_modify: F, - with_old_value: G, + condition: F, + with_previous_value: G, ) -> Option where K: Borrow, { - let guard = &crossbeam_epoch::pin(); - - self.do_modify(key, on_modify, guard) - .and_then(move |b| b.maybe_value.as_ref().map(with_old_value)) + self.remove_entry_if_and(key, condition, move |_, v| with_previous_value(v)) } -} -impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap { - fn get_bucket( + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove it and return the result of + /// invoking `with_previous_entry` with it. + /// + /// `condition` may be invoked one or more times, even if no entry was + /// removed. If `condition` failed or there was no value associated with + /// `key`, `with_previous_entry` is not invoked and [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn remove_entry_if_and< + Q: Hash + Eq + ?Sized, + F: FnMut(&K, &V) -> bool, + G: FnOnce(&K, &V) -> T, + T, + >( &self, key: &Q, - guard: &'g Guard, - ) -> Option<&'g Bucket> + mut condition: F, + with_previous_entry: G, + ) -> Option where K: Borrow, { - let hash = self.get_hash(&key); + let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); - let buckets_ptr = self.buckets.load_consume(guard); + let result; - if buckets_ptr.is_null() { - return None; - } + loop { + match bucket_array_ref.remove_if(guard, hash, key, condition) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + self.len.fetch_sub(1, Ordering::Relaxed); + result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() })); - let buckets = unsafe { buckets_ptr.deref() }; - let (found_bucket_ptr, new_buckets_ptr) = buckets.get(key, hash, guard); + unsafe { bucket::defer_destroy_tombstone(guard, previous_bucket_ptr) }; + } else { + result = None; + } - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); + break; + } + Err(c) => { + condition = c; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } + } } - if let Some(found_bucket) = unsafe { found_bucket_ptr.as_ref() } { - assert!(found_bucket.key.borrow() == key); + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - Some(found_bucket) - } else { - None - } + result } - fn do_insert(&self, key: K, value: V, guard: &'g Guard) -> Option<&'g Bucket> { - let hash = self.get_hash(&key); - - let buckets_ptr = self.get_or_create_buckets(guard); - assert!(!buckets_ptr.is_null()); - let buckets = unsafe { buckets_ptr.deref() }; - - let new_bucket = Owned::new(Bucket { - key, - maybe_value: Some(value), - }) - .into_shared(guard); - - let (previous_bucket_ptr, new_buckets_ptr) = buckets.insert(new_bucket, hash, guard); - - // increment length if we replaced a null or tombstone bucket - if unsafe { previous_bucket_ptr.as_ref() } - .map(|b| b.maybe_value.is_none()) - .unwrap_or(true) - { - self.len.fetch_add(1, Ordering::Relaxed); - } - - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); - } - - if !previous_bucket_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(previous_bucket_ptr.into_owned()); - }) - }; - } - - unsafe { previous_bucket_ptr.as_ref() } + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return a copy of the previously associated value. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `V` must implement [`Clone`], as other threads may hold references to + /// the associated value. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_or_modify V>( + &self, + key: K, + value: V, + on_modify: F, + ) -> Option + where + V: Clone, + { + self.insert_with_or_modify_entry_and(key, move || value, on_modify, |_, v| v.clone()) } - fn do_remove( + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return a copy of the previous entry. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `K` and `V` must implement [`Clone`], as other threads may hold + /// references to the entry. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_or_modify_entry V>( &self, - key: &Q, - guard: &'g Guard, - ) -> Option<&'g Bucket> + key: K, + value: V, + on_modify: F, + ) -> Option<(K, V)> where - K: Borrow + Clone, + K: Clone, + V: Clone, { - let buckets_ptr = self.buckets.load_consume(guard); - - if buckets_ptr.is_null() { - return None; - } - - let buckets_ref = unsafe { buckets_ptr.deref() }; - let hash = self.get_hash(key); - - let (removed_ptr, new_buckets_ptr) = buckets_ref.remove(key, hash, None, guard); - - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); - } - - if !removed_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(removed_ptr.into_owned()); - }) - }; - - self.len.fetch_sub(1, Ordering::Relaxed); - } - - unsafe { removed_ptr.as_ref() } + self.insert_with_or_modify_entry_and( + key, + move || value, + on_modify, + |k, v| (k.clone(), v.clone()), + ) } - fn do_modify V>( + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return a copy of the previously + /// associated value. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked + /// and [`None`] will be returned. `on_modify` may be invoked multiple + /// times, even if [`None`] is returned. Similarly, `on_insert` may be + /// invoked if [`Some`] is returned. + /// + /// `V` must implement [`Clone`], as other threads may hold references to + /// the associated value. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_with_or_modify V, G: FnMut(&K, &V) -> V>( &self, - key: &Q, - modifier: F, - guard: &'g Guard, - ) -> Option<&'g Bucket> + key: K, + on_insert: F, + on_modify: G, + ) -> Option where - K: Borrow + Clone, + V: Clone, { - let buckets_ptr = self.buckets.load_consume(guard); - - if buckets_ptr.is_null() { - return None; - } - - let buckets = unsafe { buckets_ptr.deref() }; - let hash = self.get_hash(key); - - let (previous_bucket_ptr, new_buckets_ptr) = - buckets.modify(key, hash, modifier, None, guard); - - if !previous_bucket_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(previous_bucket_ptr.into_owned()); - }) - }; - } - - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); - } - - unsafe { previous_bucket_ptr.as_ref() } + self.insert_with_or_modify_entry_and(key, on_insert, on_modify, |_, v| v.clone()) } - fn get_hash(&self, key: &Q) -> u64 { - let mut hasher = self.hash_builder.build_hasher(); - key.hash(&mut hasher); - - hasher.finish() + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return a copy of the previous + /// entry. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked + /// and [`None`] will be returned. `on_modify` may be invoked multiple + /// times, even if [`None`] is returned. Similarly, `on_insert` may be + /// invoked if [`Some`] is returned. + /// + /// `K` and `V` must implement [`Clone`], as other threads may hold + /// references to the entry. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_with_or_modify_entry V, G: FnMut(&K, &V) -> V>( + &self, + key: K, + on_insert: F, + on_modify: G, + ) -> Option<(K, V)> + where + K: Clone, + V: Clone, + { + self.insert_with_or_modify_entry_and(key, on_insert, on_modify, |k, v| { + (k.clone(), v.clone()) + }) } - fn get_or_create_buckets(&self, guard: &'g Guard) -> Shared<'g, BucketArray> { - const DEFAULT_CAPACITY: usize = 64; - - let mut buckets_ptr = self.buckets.load_consume(guard); - let mut maybe_new_buckets = None; - - loop { - if buckets_ptr.is_null() { - let new_buckets = match maybe_new_buckets.take() { - Some(b) => b, - None => Owned::new(BucketArray::with_capacity_hasher_and_epoch( - DEFAULT_CAPACITY, - self.hash_builder.clone(), - 0, - )), - }; - - match self.buckets.compare_and_set_weak( - Shared::null(), - new_buckets, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(new_buckets) => return new_buckets, - Err(e) => { - maybe_new_buckets = Some(e.new); - buckets_ptr = self.buckets.load_consume(guard); - } - } - } else { - return buckets_ptr; - } - } - } - - fn swing_bucket_array_ptr( + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return the result of invoking `with_old_value` with + /// the previously associated value. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn insert_or_modify_and V, G: FnOnce(&V) -> T, T>( &self, - mut current_ptr: Shared<'g, BucketArray>, - new_ptr: Shared<'g, BucketArray>, - guard: &'g Guard, - ) { - assert!(!current_ptr.is_null()); - assert!(!new_ptr.is_null()); - - let minimum_epoch = unsafe { new_ptr.deref() }.epoch; - let mut current = unsafe { current_ptr.deref() }; - - while current.epoch < minimum_epoch { - let next_ptr = current.next_array.load_consume(guard); - assert!(!next_ptr.is_null()); - - match self.buckets.compare_and_set( - current_ptr, - next_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(current_ptr.into_owned()); - }); - } - - current_ptr = next_ptr; - } - Err(_) => current_ptr = self.buckets.load_consume(guard), - } - - current = unsafe { current_ptr.deref() }; - } - } -} - -impl Drop for HashMap { - fn drop(&mut self) { - let guard = unsafe { crossbeam_epoch::unprotected() }; - - let mut buckets_ptr = self.buckets.load_consume(guard); - - while !buckets_ptr.is_null() { - let this_bucket_array = unsafe { buckets_ptr.deref() }; - let new_buckets_ptr = this_bucket_array.next_array.load_consume(guard); - - for this_bucket in this_bucket_array.buckets.iter() { - let this_bucket_ptr = this_bucket.load_consume(guard); - - if this_bucket_ptr.is_null() || this_bucket_ptr.tag().has_redirect() { - continue; - } - - mem::drop(unsafe { this_bucket_ptr.into_owned() }); - } - - mem::drop(unsafe { buckets_ptr.into_owned() }); - buckets_ptr = new_buckets_ptr; - } - } -} - -struct BucketArray { - buckets: Vec>>, // len() is a power of 2 - len: AtomicUsize, - next_array: Atomic>, - hash_builder: Arc, - epoch: u64, -} - -impl BucketArray { - fn with_capacity_hasher_and_epoch( - capacity: usize, - hash_builder: Arc, - epoch: u64, - ) -> BucketArray { - BucketArray { - buckets: vec![Atomic::null(); (capacity * 2).next_power_of_two()], - len: AtomicUsize::new(0), - next_array: Atomic::null(), - hash_builder, - epoch, - } - } - - fn get_hash(&self, key: &K) -> u64 { - let mut hasher = self.hash_builder.build_hasher(); - key.hash(&mut hasher); - - hasher.finish() - } -} - -// set on bucket pointers if this bucket has been moved into the next array -const REDIRECT_TAG: usize = 1; - -// this might seem complex -- you're right, but i wanted to add another tag -// it ended up not being useful, but the code was already written... -trait Tag { - fn has_redirect(self) -> bool; - fn with_redirect(self) -> Self; - fn without_redirect(self) -> Self; -} - -impl Tag for usize { - fn has_redirect(self) -> bool { - (self & REDIRECT_TAG) != 0 - } - - fn with_redirect(self) -> Self { - self | REDIRECT_TAG + key: K, + value: V, + on_modify: F, + with_old_value: G, + ) -> Option { + self.insert_with_or_modify_entry_and( + key, + move || value, + on_modify, + move |_, v| with_old_value(v), + ) } - fn without_redirect(self) -> Self { - self & !REDIRECT_TAG + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return the result of invoking `with_old_entry` with + /// the previous entry. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn insert_or_modify_entry_and V, G: FnOnce(&K, &V) -> T, T>( + &self, + key: K, + value: V, + on_modify: F, + with_old_entry: G, + ) -> Option { + self.insert_with_or_modify_entry_and(key, move || value, on_modify, with_old_entry) } -} -impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketArray { - fn get( + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return the result of invoking + /// `with_old_value` with the previously associated value. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked, + /// `with_old_value` will not be invoked, and [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// Similarly, `on_insert` may be invoked if [`Some`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + pub fn insert_with_or_modify_and< + F: FnOnce() -> V, + G: FnMut(&K, &V) -> V, + H: FnOnce(&V) -> T, + T, + >( &self, - key: &Q, - hash: u64, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) - where - K: Borrow, - { - let self_ptr = (self as *const BucketArray).into(); - - let capacity = self.buckets.len(); - let offset = (hash & (self.buckets.len() - 1) as u64) as usize; - - for this_bucket_ptr in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - .map(|this_bucket| this_bucket.load_consume(guard)) - { - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.key.borrow() != key { - continue; - } else if !this_bucket_ptr.tag().has_redirect() { - return (this_bucket_ptr, self_ptr); - } - - // consume load from this_bucket isn't strong enough to publish - // writes to *self.next_array - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); - - return next_array.get(key, hash, guard); - } else { - return (Shared::null(), self_ptr); - } - } - - (Shared::null(), self_ptr) + key: K, + on_insert: F, + on_modify: G, + with_old_value: H, + ) -> Option { + self.insert_with_or_modify_entry_and(key, on_insert, on_modify, move |_, v| { + with_old_value(v) + }) } - fn insert( + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return the result of invoking + /// `with_old_entry` with the previous entry. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked, + /// `with_old_value` will not be invoked, and [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// Similarly, `on_insert` may be invoked if [`Some`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + pub fn insert_with_or_modify_entry_and< + F: FnOnce() -> V, + G: FnMut(&K, &V) -> V, + H: FnOnce(&K, &V) -> T, + T, + >( &self, - bucket_ptr: Shared<'g, Bucket>, - hash: u64, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) { - assert!(!bucket_ptr.is_null()); - - let bucket = unsafe { bucket_ptr.deref() }; - let capacity = self.buckets.len(); - let len = self.len.load(Ordering::Relaxed); - - // grow if inserting would push us over a load factor of 0.5 - if (len + 1) > (capacity / 2) { - let next_array = self.grow(guard); - - return next_array.insert(bucket_ptr, hash, guard); - } - - let grow_into_and_insert_into_next = || { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - // self.grow_into(next_array, guard); - - next_array.insert(bucket_ptr, hash, guard) - }; - - let offset = (hash & (capacity - 1) as u64) as usize; - let mut have_seen_redirect = false; + key: K, + on_insert: F, + mut on_modify: G, + with_old_entry: H, + ) -> Option { + let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); + let mut state = InsertOrModifyState::New(key, on_insert); - for this_bucket in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - loop { - let this_bucket_ptr = this_bucket.load_consume(guard); + let result; - have_seen_redirect = have_seen_redirect || this_bucket_ptr.tag().has_redirect(); + loop { + while self.len.load(Ordering::Relaxed) > bucket_array_ref.capacity() { + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } - let should_increment_len = - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.key != bucket.key { - break; + match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 { + self.len.fetch_add(1, Ordering::Relaxed); + result = None; + } else { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + result = Some(with_old_entry(key, unsafe { &*value.as_ptr() })); } - this_bucket_ref.is_tombstone() + unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) }; } else { - true - }; - - if this_bucket_ptr.tag().has_redirect() { - return grow_into_and_insert_into_next(); - } - - if this_bucket - .compare_and_set_weak( - this_bucket_ptr, - bucket_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) - .is_ok() - { - if should_increment_len { - // replaced a tombstone self.len.fetch_add(1, Ordering::Relaxed); + result = None; } - return ( - this_bucket_ptr, - (self as *const BucketArray).into(), - ); + break; + } + Err((s, f)) => { + state = s; + on_modify = f; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); } } } - if have_seen_redirect { - grow_into_and_insert_into_next() - } else { - let next_array = self.grow(guard); + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - next_array.insert(bucket_ptr, hash, guard) - } + result } - fn remove( - &self, - key: &Q, - hash: u64, - mut maybe_new_bucket: Option>>, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return a copy + /// of the previously associated value. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `V` must implement [`Clone`], as other + /// threads may hold references to the associated value. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn modify V>(&self, key: K, on_modify: F) -> Option where - K: Clone + Borrow, + V: Clone, { - let self_shared = (self as *const BucketArray).into(); - - let capacity = self.buckets.len(); - let offset = (hash & (capacity - 1) as u64) as usize; - - for this_bucket in (0..self.buckets.len()) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.key.borrow() != key { - // hash collision - continue; - } - } - - loop { - if this_bucket_ptr.tag().has_redirect() { - let next_array = unsafe { self.get_next_unchecked(guard) }; - - return next_array.remove(key, hash, maybe_new_bucket, guard); - } - - let this_bucket_ref = - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.is_tombstone() { - return (Shared::null(), self_shared); - } - - this_bucket_ref - } else { - return (Shared::null(), self_shared); - }; - - let new_bucket = match maybe_new_bucket.take() { - Some(b) => b, - None => Owned::new(Bucket { - key: this_bucket_ref.key.clone(), - maybe_value: None, - }), - }; - - match this_bucket.compare_and_set_weak( - this_bucket_ptr, - new_bucket, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => { - self.len.fetch_sub(1, Ordering::Relaxed); - - return (this_bucket_ptr, self_shared); - } - Err(e) => { - maybe_new_bucket = Some(e.new); - let current_bucket_ptr = this_bucket.load_consume(guard); - - // check is only necessary once -- keys never change - // after being inserted/removed - if this_bucket_ptr.is_null() - && unsafe { current_bucket_ptr.as_ref() } - .map(|b| b.key.borrow() != key) - .unwrap_or(false) - { - break; - } - - this_bucket_ptr = current_bucket_ptr; - } - } - } - } - - (Shared::null(), self_shared) + self.modify_entry_and(key, on_modify, |_, v| v.clone()) } - fn modify V>( - &self, - key: &Q, - hash: u64, - mut modifier: F, - maybe_new_bucket_ptr: Option>>, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return a copy + /// of the previously entry. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn modify_entry V>(&self, key: K, on_modify: F) -> Option<(K, V)> where - K: Clone + Borrow, + K: Clone, + V: Clone, { - let self_shared: Shared<'g, BucketArray> = (self as *const Self).into(); - - let capacity = self.buckets.len(); - let offset = (hash & (self.buckets.len() - 1) as u64) as usize; - - for this_bucket in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - let mut this_bucket_ref = match unsafe { this_bucket_ptr.as_ref() } { - Some(b) => { - // buckets will never have their key changed, so we only have to - // make this check once - if b.key.borrow() != key { - continue; - } - - b - } - None => return (Shared::null(), self_shared), - }; - - let mut new_bucket_ptr = maybe_new_bucket_ptr.unwrap_or_else(|| { - Owned::new(Bucket { - key: this_bucket_ref.key.clone(), - maybe_value: None, - }) - }); - - loop { - if this_bucket_ptr.tag().has_redirect() { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); - - return next_array.modify(key, hash, modifier, Some(new_bucket_ptr), guard); - } - - let old_value = match this_bucket_ref.maybe_value.as_ref() { - Some(v) => v, - None => return (Shared::null(), self_shared), - }; - - new_bucket_ptr.maybe_value = Some(modifier(old_value)); - - // i assume that a strong CAS is less expensive than invoking - // modifier a second time - match this_bucket.compare_and_set( - this_bucket_ptr, - new_bucket_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => return (this_bucket_ptr, self_shared), - Err(e) => { - new_bucket_ptr = e.new; - - this_bucket_ptr = this_bucket.load_consume(guard); - assert!(!this_bucket_ptr.is_null()); - - this_bucket_ref = unsafe { this_bucket_ptr.deref() }; - } - } - } - } - - (Shared::null(), self_shared) + self.modify_entry_and(key, on_modify, |k, v| (k.clone(), v.clone())) } - fn insert_or_modify V, F: FnMut(&V) -> V>( + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return the + /// result of invoking `with_old_value` with the previously associated + /// value. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + pub fn modify_and V, G: FnOnce(&V) -> T, T>( &self, - mut key_or_bucket: KeyOrBucket, - hash: u64, - mut inserter: FunctionOrValue, - mut modifier: F, - guard: &'g Guard, - ) -> BucketAndParent<'g, K, V, S> { - let self_shared = (self as *const BucketArray).into(); - - let capacity = self.buckets.len(); - let len = self.len.load(Ordering::Relaxed); - - let insert_into_next = |key_or_bucket, inserter, modifier| { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); - - next_array.insert_or_modify(key_or_bucket, hash, inserter, modifier, guard) - }; - - // grow if inserting would push us over a load factor of 0.5 - if (len + 1) > (capacity / 2) { - let next_array = self.grow(guard); - - return next_array.insert_or_modify(key_or_bucket, hash, inserter, modifier, guard); - } - - let offset = (hash & (capacity - 1) as u64) as usize; - let mut have_seen_redirect = false; - - for this_bucket in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - have_seen_redirect = have_seen_redirect || this_bucket_ptr.tag().has_redirect(); - - // if the bucket pointer is non-null and its key does not match, move to the next bucket - if !this_bucket_ptr.tag().has_redirect() - && unsafe { this_bucket_ptr.as_ref() } - .map(|this_bucket_ref| &this_bucket_ref.key != key_or_bucket.as_key()) - .unwrap_or(false) - { - continue; - } + key: K, + on_modify: F, + with_old_value: G, + ) -> Option { + self.modify_entry_and(key, on_modify, move |_, v| with_old_value(v)) + } - loop { - if this_bucket_ptr.tag().has_redirect() { - return insert_into_next(key_or_bucket, inserter, modifier); - } + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return the + /// result of invoking `with_old_value` with the previous entry. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + pub fn modify_entry_and V, G: FnOnce(&K, &V) -> T, T>( + &self, + key: K, + mut on_modify: F, + with_old_entry: G, + ) -> Option { + let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); + let mut key_or_owned_bucket = KeyOrOwnedBucket::Key(key); - let new_value = unsafe { this_bucket_ptr.as_ref() } - .and_then(|this_bucket_ref| this_bucket_ref.maybe_value.as_ref()) - .map(&mut modifier) - .unwrap_or_else(|| inserter.into_value()); - let new_bucket_ptr = key_or_bucket.into_bucket_with_value(new_value); - - // i assume it is more expensive to invoke modifier than it is - // to strong CAS - match this_bucket.compare_and_set( - this_bucket_ptr, - new_bucket_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => { - if this_bucket_ptr.is_null() { - self.len.fetch_add(1, Ordering::Relaxed); - } + let result; - return BucketAndParent { - bucket: this_bucket_ptr, - parent: self_shared, - }; + loop { + match bucket_array_ref.modify(guard, hash, key_or_owned_bucket, on_modify) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + result = Some(with_old_entry(key, unsafe { &*value.as_ptr() })); + + unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) }; + } else { + result = None; } - Err(mut e) => { - inserter = FunctionOrValue::Value(e.new.maybe_value.take().unwrap()); - key_or_bucket = KeyOrBucket::Bucket(e.new); - - have_seen_redirect = have_seen_redirect || e.current.tag().has_redirect(); - - // if another thread inserted into this bucket, check to see if its key - // matches the one we are trying to insert/modify - if this_bucket_ptr.is_null() - && !e.current.tag().has_redirect() - && unsafe { e.current.as_ref() } - .map(|this_bucket_ref| { - &this_bucket_ref.key != key_or_bucket.as_key() - }) - .unwrap_or(false) - { - continue; - } - this_bucket_ptr = this_bucket.load_consume(guard); - } + break; + } + Err((kb, f)) => { + key_or_owned_bucket = kb; + on_modify = f; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); } } } - let next_array = if have_seen_redirect { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - next_array - } else { - self.grow(guard) - }; - - next_array.insert_or_modify(key_or_bucket, hash, inserter, modifier, guard) + result } +} - fn grow(&self, guard: &'g Guard) -> &'g BucketArray { - let maybe_next_array_ptr = self.next_array.load_consume(guard); - - if let Some(next_array) = unsafe { maybe_next_array_ptr.as_ref() } { - self.grow_into(next_array, guard); +impl HashMap { + fn bucket_array<'g>(&self, guard: &'g Guard) -> &'g BucketArray { + const DEFAULT_CAPACITY: usize = 64; - return next_array; - } + let mut maybe_new_bucket_array = None; - let allocated_array_ptr = Owned::new(BucketArray::with_capacity_hasher_and_epoch( - self.buckets.len() * 2, - self.hash_builder.clone(), - self.epoch + 1, - )); - - let next_array_ptr = match self.next_array.compare_and_set( - Shared::null(), - allocated_array_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(next_array_ptr) => next_array_ptr, - Err(_) => self.next_array.load_consume(guard), - }; + loop { + let bucket_array_ptr = self.bucket_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; + if let Some(bucket_array_ref) = unsafe { bucket_array_ptr.as_ref() } { + return bucket_array_ref; + } - self.grow_into(next_array, guard); + let new_bucket_array = maybe_new_bucket_array + .unwrap_or_else(|| Owned::new(BucketArray::with_capacity(0, DEFAULT_CAPACITY))); - next_array + match self.bucket_array.compare_and_set_weak( + Shared::null(), + new_bucket_array, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(b) => return unsafe { b.as_ref() }.unwrap(), + Err(CompareAndSetError { new, .. }) => maybe_new_bucket_array = Some(new), + } + } } - fn grow_into(&self, next_array: &'g BucketArray, guard: &'g Guard) { - for this_bucket in self.buckets.iter() { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - // if we insert a bucket that is then tombstone-d, we need to - // insert into the new bucket arrays - let mut maybe_hash = None; - - loop { - if this_bucket_ptr.tag().has_redirect() { - break; - } - - // if we already inserted this bucket, or if this bucket is - // non-null and not a tombstone - if maybe_hash.is_some() - || !unsafe { this_bucket_ptr.as_ref() } - .map(Bucket::is_tombstone) - .unwrap_or(true) - { - assert!(!this_bucket_ptr.is_null()); + fn swing_bucket_array<'g>( + &self, + guard: &'g Guard, + mut current_ref: &'g BucketArray, + min_ref: &'g BucketArray, + ) { + let min_epoch = min_ref.epoch; - let hash = maybe_hash.unwrap_or_else(|| { - let key = unsafe { &this_bucket_ptr.deref().key }; + let mut current_ptr = (current_ref as *const BucketArray).into(); + let min_ptr: Shared<'g, _> = (min_ref as *const BucketArray).into(); - self.get_hash(key) - }); + loop { + if current_ref.epoch >= min_epoch { + return; + } - next_array.insert(this_bucket_ptr, hash, guard); - maybe_hash = Some(hash); - } + match self.bucket_array.compare_and_set_weak( + current_ptr, + min_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) }, + Err(_) => { + let new_ptr = self.bucket_array.load_consume(guard); + assert!(!new_ptr.is_null()); - // strong CAS to avoid spurious duplicate re-insertions - match this_bucket.compare_and_set( - this_bucket_ptr, - this_bucket_ptr.with_tag(this_bucket_ptr.tag().with_redirect()), - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => break, - Err(_) => this_bucket_ptr = this_bucket.load_consume(guard), + current_ptr = new_ptr; + current_ref = unsafe { new_ptr.as_ref() }.unwrap(); } } } } - - unsafe fn get_next_unchecked(&self, guard: &'g Guard) -> &'g BucketArray { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - - let next_array = next_array_ptr.deref(); - self.grow_into(next_array, guard); - - next_array - } } -struct BucketAndParent<'a, K: Hash + Eq, V, S: BuildHasher> { - bucket: Shared<'a, Bucket>, - parent: Shared<'a, BucketArray>, -} - -#[repr(align(2))] -struct Bucket { - key: K, - maybe_value: Option, -} - -enum FunctionOrValue T, T> { - Function(F), - Value(T), -} - -impl T, T> FunctionOrValue { - fn into_value(self) -> T { - match self { - FunctionOrValue::Function(f) => f(), - FunctionOrValue::Value(v) => v, - } - } -} +impl Drop for HashMap { + fn drop(&mut self) { + let guard = unsafe { &crossbeam_epoch::unprotected() }; + atomic::fence(Ordering::Acquire); -impl Bucket { - fn is_tombstone(&self) -> bool { - self.maybe_value.is_none() - } -} + let mut current_ptr = self.bucket_array.load(Ordering::Relaxed, guard); -enum KeyOrBucket { - Key(K), - Bucket(Owned>), -} + while let Some(current_ref) = unsafe { current_ptr.as_ref() } { + let next_ptr = current_ref.next.load(Ordering::Relaxed, guard); -impl KeyOrBucket { - fn into_bucket_with_value(self, value: V) -> Owned> { - match self { - KeyOrBucket::Key(key) => Owned::new(Bucket { - key, - maybe_value: Some(value), - }), - KeyOrBucket::Bucket(mut bucket_ptr) => { - bucket_ptr.maybe_value = Some(value); - - bucket_ptr + for this_bucket_ptr in current_ref + .buckets + .iter() + .map(|b| b.load(Ordering::Relaxed, guard)) + .filter(|p| !p.is_null()) + .filter(|p| next_ptr.is_null() || p.tag() & bucket::TOMBSTONE_TAG == 0) + { + // only delete tombstones from the newest bucket array + // the only way this becomes a memory leak is if there was a panic during a rehash, + // in which case i'm going to say that running destructors and freeing memory is + // best-effort, and my best effort is to not do it + unsafe { bucket::defer_acquire_destroy(guard, this_bucket_ptr) }; } - } - } - fn as_key(&self) -> &K { - match self { - KeyOrBucket::Key(key) => &key, - KeyOrBucket::Bucket(bucket) => &bucket.key, + unsafe { bucket::defer_acquire_destroy(guard, current_ptr) }; + + current_ptr = next_ptr; } } } diff --git a/src/map/bucket.rs b/src/map/bucket.rs new file mode 100644 index 0000000..8d09d53 --- /dev/null +++ b/src/map/bucket.rs @@ -0,0 +1,798 @@ +// MIT License +// +// Copyright (c) 2020 Gregory Meyer +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation files +// (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of the Software, +// and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +use std::{ + borrow::Borrow, + hash::{BuildHasher, Hash, Hasher}, + mem::{self, MaybeUninit}, + ptr, + sync::atomic::{self, Ordering}, +}; + +use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared}; + +pub(crate) struct BucketArray { + pub(crate) buckets: Box<[Atomic>]>, + pub(crate) next: Atomic>, + pub(crate) epoch: usize, +} + +impl BucketArray { + pub(crate) fn with_capacity(epoch: usize, capacity: usize) -> Self { + let real_capacity = (capacity * 2).next_power_of_two(); + let mut buckets = Vec::with_capacity(real_capacity); + + for _ in 0..real_capacity { + buckets.push(Atomic::null()); + } + + let buckets = buckets.into_boxed_slice(); + + Self { + buckets, + next: Atomic::null(), + epoch, + } + } + + pub(crate) fn capacity(&self) -> usize { + assert!(self.buckets.len().is_power_of_two()); + + self.buckets.len() / 2 + } +} + +impl<'g, K: 'g + Eq, V: 'g> BucketArray { + pub(crate) fn get( + &self, + guard: &'g Guard, + hash: u64, + key: &Q, + ) -> Result>, RelocatedError> + where + K: Borrow, + { + let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| { + let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } + { + this_bucket_ref + } else { + return ProbeLoopAction::Return(Shared::null()); + }; + + let this_key = &this_bucket_ref.key; + + if this_key.borrow() != key { + return ProbeLoopAction::Continue; + } + + let result_ptr = if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 { + this_bucket_ptr + } else { + Shared::null() + }; + + ProbeLoopAction::Return(result_ptr) + }); + + match loop_result { + ProbeLoopResult::Returned(t) => Ok(t), + ProbeLoopResult::LoopEnded => Ok(Shared::null()), + ProbeLoopResult::FoundSentinelTag => Err(RelocatedError), + } + } + + pub(crate) fn insert( + &self, + guard: &'g Guard, + hash: u64, + bucket_ptr: Owned>, + ) -> Result>, Owned>> { + let mut maybe_bucket_ptr = Some(bucket_ptr); + + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let bucket_ptr = maybe_bucket_ptr.take().unwrap(); + let key = &bucket_ptr.key; + + if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } { + if this_key != key { + maybe_bucket_ptr = Some(bucket_ptr); + + return ProbeLoopAction::Continue; + } + } + + match this_bucket.compare_and_set_weak( + this_bucket_ptr, + bucket_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(_) => ProbeLoopAction::Return(this_bucket_ptr), + Err(CompareAndSetError { new, .. }) => { + maybe_bucket_ptr = Some(new); + + ProbeLoopAction::Reload + } + } + }); + + loop_result + .returned() + .ok_or_else(|| maybe_bucket_ptr.unwrap()) + } + + pub(crate) fn remove_if bool>( + &self, + guard: &'g Guard, + hash: u64, + key: &Q, + mut condition: F, + ) -> Result>, F> + where + K: Borrow, + { + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } + { + this_bucket_ref + } else { + return ProbeLoopAction::Return(Shared::null()); + }; + + let this_key = &this_bucket_ref.key; + + if this_key.borrow() != key { + return ProbeLoopAction::Continue; + } else if this_bucket_ptr.tag() & TOMBSTONE_TAG != 0 { + return ProbeLoopAction::Return(Shared::null()); + } + + let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() }; + + if !condition(this_key, this_value) { + return ProbeLoopAction::Return(Shared::null()); + } + + let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG); + + match this_bucket.compare_and_set_weak( + this_bucket_ptr, + new_bucket_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(_) => ProbeLoopAction::Return(new_bucket_ptr), + Err(_) => ProbeLoopAction::Reload, + } + }); + + match loop_result { + ProbeLoopResult::Returned(t) => Ok(t), + ProbeLoopResult::LoopEnded => Ok(Shared::null()), + ProbeLoopResult::FoundSentinelTag => Err(condition), + } + } + + pub(crate) fn modify V>( + &self, + guard: &'g Guard, + hash: u64, + key_or_owned_bucket: KeyOrOwnedBucket, + mut modifier: F, + ) -> Result>, (KeyOrOwnedBucket, F)> { + let mut maybe_key_or_owned_bucket = Some(key_or_owned_bucket); + + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let key_or_owned_bucket = maybe_key_or_owned_bucket.take().unwrap(); + + let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } + { + this_bucket_ref + } else { + maybe_key_or_owned_bucket = Some(key_or_owned_bucket); + + return ProbeLoopAction::Return(Shared::null()); + }; + + let this_key = &this_bucket_ref.key; + let key = key_or_owned_bucket.key(); + + if key != this_key { + maybe_key_or_owned_bucket = Some(key_or_owned_bucket); + + return ProbeLoopAction::Continue; + } + + if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 { + let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() }; + let new_value = modifier(this_key, this_value); + let new_bucket = key_or_owned_bucket.into_bucket(new_value); + + if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak( + this_bucket_ptr, + new_bucket, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + maybe_key_or_owned_bucket = Some(KeyOrOwnedBucket::OwnedBucket(new)); + + ProbeLoopAction::Reload + } else { + ProbeLoopAction::Return(this_bucket_ptr) + } + } else { + ProbeLoopAction::Return(Shared::null()) + } + }); + + loop_result + .returned() + .ok_or_else(|| (maybe_key_or_owned_bucket.unwrap(), modifier)) + } + + pub(crate) fn insert_or_modify V, G: FnMut(&K, &V) -> V>( + &self, + guard: &'g Guard, + hash: u64, + state: InsertOrModifyState, + mut modifier: G, + ) -> Result>, (InsertOrModifyState, G)> { + let mut maybe_state = Some(state); + + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let state = maybe_state.take().unwrap(); + + let (new_bucket, maybe_insert_value) = + if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { + let this_key = &this_bucket_ref.key; + + if this_key != state.key() { + maybe_state = Some(state); + + return ProbeLoopAction::Continue; + } + + if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 { + let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() }; + let new_value = modifier(this_key, this_value); + + let (new_bucket, insert_value) = state.into_modify_bucket(new_value); + + (new_bucket, Some(insert_value)) + } else { + (state.into_insert_bucket(), None) + } + } else { + (state.into_insert_bucket(), None) + }; + + if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak( + this_bucket_ptr, + new_bucket, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + maybe_state = Some(InsertOrModifyState::from_bucket_value( + new, + maybe_insert_value, + )); + + ProbeLoopAction::Reload + } else { + ProbeLoopAction::Return(this_bucket_ptr) + } + }); + + loop_result + .returned() + .ok_or_else(|| (maybe_state.unwrap(), modifier)) + } + + fn insert_for_grow( + &self, + guard: &'g Guard, + hash: u64, + bucket_ptr: Shared<'g, Bucket>, + ) -> Option { + assert!(!bucket_ptr.is_null()); + assert_eq!(bucket_ptr.tag() & SENTINEL_TAG, 0); + assert_ne!(bucket_ptr.tag() & BORROWED_TAG, 0); + + let key = &unsafe { bucket_ptr.deref() }.key; + + let loop_result = self.probe_loop(guard, hash, |i, this_bucket, this_bucket_ptr| { + if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } { + if this_bucket_ptr == bucket_ptr { + return ProbeLoopAction::Return(None); + } else if this_key != key { + return ProbeLoopAction::Continue; + } else if this_bucket_ptr.tag() & BORROWED_TAG == 0 { + return ProbeLoopAction::Return(None); + } + } + + if this_bucket_ptr.is_null() && bucket_ptr.tag() & TOMBSTONE_TAG != 0 { + ProbeLoopAction::Return(None) + } else if this_bucket + .compare_and_set_weak( + this_bucket_ptr, + bucket_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) + .is_ok() + { + ProbeLoopAction::Return(Some(i)) + } else { + ProbeLoopAction::Reload + } + }); + + loop_result.returned().flatten() + } +} + +impl<'g, K: 'g, V: 'g> BucketArray { + fn probe_loop< + F: FnMut(usize, &Atomic>, Shared<'g, Bucket>) -> ProbeLoopAction, + T, + >( + &self, + guard: &'g Guard, + hash: u64, + mut f: F, + ) -> ProbeLoopResult { + let offset = hash as usize & (self.buckets.len() - 1); + + for i in + (0..self.buckets.len()).map(|i| (i.wrapping_add(offset)) & (self.buckets.len() - 1)) + { + let this_bucket = &self.buckets[i]; + + loop { + let this_bucket_ptr = this_bucket.load_consume(guard); + + if this_bucket_ptr.tag() & SENTINEL_TAG != 0 { + return ProbeLoopResult::FoundSentinelTag; + } + + match f(i, this_bucket, this_bucket_ptr) { + ProbeLoopAction::Continue => break, + ProbeLoopAction::Reload => (), + ProbeLoopAction::Return(t) => return ProbeLoopResult::Returned(t), + } + } + } + + ProbeLoopResult::LoopEnded + } + + pub(crate) fn rehash( + &self, + guard: &'g Guard, + build_hasher: &H, + ) -> &'g BucketArray + where + K: Hash + Eq, + { + let next_array = self.next_array(guard); + assert!(self.buckets.len() <= next_array.buckets.len()); + + for this_bucket in self.buckets.iter() { + let mut maybe_state: Option<(usize, Shared<'g, Bucket>)> = None; + + loop { + let this_bucket_ptr = this_bucket.load_consume(guard); + + if this_bucket_ptr.tag() & SENTINEL_TAG != 0 { + break; + } + + let to_put_ptr = this_bucket_ptr.with_tag(this_bucket_ptr.tag() | BORROWED_TAG); + + if let Some((index, mut next_bucket_ptr)) = maybe_state { + assert!(!this_bucket_ptr.is_null()); + + let next_bucket = &next_array.buckets[index]; + + while next_bucket_ptr.tag() & BORROWED_TAG != 0 + && next_bucket + .compare_and_set_weak( + next_bucket_ptr, + to_put_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) + .is_err() + { + next_bucket_ptr = next_bucket.load_consume(guard); + } + } else if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { + let key = &this_bucket_ref.key; + let hash = hash(build_hasher, key); + + if let Some(index) = next_array.insert_for_grow(guard, hash, to_put_ptr) { + maybe_state = Some((index, to_put_ptr)); + } + } + + if this_bucket + .compare_and_set_weak( + this_bucket_ptr, + Shared::null().with_tag(SENTINEL_TAG), + (Ordering::Release, Ordering::Relaxed), + guard, + ) + .is_ok() + { + if !this_bucket_ptr.is_null() + && this_bucket_ptr.tag() & TOMBSTONE_TAG != 0 + && maybe_state.is_none() + { + unsafe { defer_destroy_bucket(guard, this_bucket_ptr) }; + } + + break; + } + } + } + + next_array + } + + fn next_array(&self, guard: &'g Guard) -> &'g BucketArray { + let mut maybe_new_next = None; + + loop { + let next_ptr = self.next.load_consume(guard); + + if let Some(next_ref) = unsafe { next_ptr.as_ref() } { + return next_ref; + } + + let new_next = maybe_new_next.unwrap_or_else(|| { + Owned::new(BucketArray::with_capacity( + self.epoch + 1, + self.buckets.len(), + )) + }); + + match self.next.compare_and_set_weak( + Shared::null(), + new_next, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(p) => return unsafe { p.deref() }, + Err(CompareAndSetError { new, .. }) => { + maybe_new_next = Some(new); + } + } + } + } +} + +#[repr(align(8))] +#[derive(Debug)] +pub(crate) struct Bucket { + pub(crate) key: K, + pub(crate) maybe_value: MaybeUninit, +} + +impl Bucket { + pub(crate) fn new(key: K, value: V) -> Bucket { + Bucket { + key, + maybe_value: MaybeUninit::new(value), + } + } +} + +#[derive(Debug, Eq, PartialEq)] +pub(crate) struct RelocatedError; + +pub(crate) enum KeyOrOwnedBucket { + Key(K), + OwnedBucket(Owned>), +} + +impl KeyOrOwnedBucket { + fn key(&self) -> &K { + match self { + Self::Key(k) => k, + Self::OwnedBucket(b) => &b.key, + } + } + + fn into_bucket(self, value: V) -> Owned> { + match self { + Self::Key(k) => Owned::new(Bucket::new(k, value)), + Self::OwnedBucket(mut b) => { + unsafe { + mem::drop( + mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init(), + ) + }; + + b + } + } + } +} + +pub(crate) enum InsertOrModifyState V> { + New(K, F), + AttemptedInsertion(Owned>), + AttemptedModification(Owned>, ValueOrFunction), +} + +impl V> InsertOrModifyState { + fn from_bucket_value( + bucket: Owned>, + value_or_function: Option>, + ) -> Self { + if let Some(value_or_function) = value_or_function { + Self::AttemptedModification(bucket, value_or_function) + } else { + Self::AttemptedInsertion(bucket) + } + } + + fn key(&self) -> &K { + match self { + InsertOrModifyState::New(k, _) => &k, + InsertOrModifyState::AttemptedInsertion(b) + | InsertOrModifyState::AttemptedModification(b, _) => &b.key, + } + } + + fn into_insert_bucket(self) -> Owned> { + match self { + InsertOrModifyState::New(k, f) => Owned::new(Bucket::new(k, f())), + InsertOrModifyState::AttemptedInsertion(b) => b, + InsertOrModifyState::AttemptedModification(mut b, v_or_f) => { + unsafe { + mem::drop( + mem::replace(&mut b.maybe_value, MaybeUninit::new(v_or_f.into_value())) + .assume_init(), + ) + }; + + b + } + } + } + + fn into_modify_bucket(self, value: V) -> (Owned>, ValueOrFunction) { + match self { + InsertOrModifyState::New(k, f) => ( + Owned::new(Bucket::new(k, value)), + ValueOrFunction::Function(f), + ), + InsertOrModifyState::AttemptedInsertion(mut b) => { + let insert_value = unsafe { + mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init() + }; + + (b, ValueOrFunction::Value(insert_value)) + } + InsertOrModifyState::AttemptedModification(mut b, v_or_f) => { + unsafe { + mem::drop( + mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init(), + ) + }; + + (b, v_or_f) + } + } + } +} + +pub(crate) enum ValueOrFunction V> { + Value(V), + Function(F), +} + +impl V> ValueOrFunction { + fn into_value(self) -> V { + match self { + ValueOrFunction::Value(v) => v, + ValueOrFunction::Function(f) => f(), + } + } +} + +pub(crate) fn hash(build_hasher: &H, key: &K) -> u64 { + let mut hasher = build_hasher.build_hasher(); + key.hash(&mut hasher); + + hasher.finish() +} + +enum ProbeLoopAction { + Continue, + Reload, + Return(T), +} + +enum ProbeLoopResult { + LoopEnded, + FoundSentinelTag, + Returned(T), +} + +impl ProbeLoopResult { + fn returned(self) -> Option { + match self { + Self::Returned(t) => Some(t), + Self::LoopEnded | Self::FoundSentinelTag => None, + } + } +} + +pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>( + guard: &'g Guard, + mut ptr: Shared<'g, Bucket>, +) { + assert!(!ptr.is_null()); + + guard.defer_unchecked(move || { + atomic::fence(Ordering::Acquire); + + if ptr.tag() & TOMBSTONE_TAG == 0 { + ptr::drop_in_place(ptr.deref_mut().maybe_value.as_mut_ptr()); + } + + mem::drop(ptr.into_owned()); + }); +} + +pub(crate) unsafe fn defer_destroy_tombstone<'g, K, V>( + guard: &'g Guard, + mut ptr: Shared<'g, Bucket>, +) { + assert!(!ptr.is_null()); + assert_ne!(ptr.tag() & TOMBSTONE_TAG, 0); + + atomic::fence(Ordering::Acquire); + // read the value now, but defer its destruction for later + let value = ptr::read(ptr.deref_mut().maybe_value.as_ptr()); + + // to be entirely honest, i don't know what order deferred functions are + // called in crossbeam-epoch. in the case that the deferred functions are + // called out of order, this prevents that from being an issue. + guard.defer_unchecked(move || mem::drop(value)); +} + +pub(crate) unsafe fn defer_acquire_destroy<'g, T>(guard: &'g Guard, ptr: Shared<'g, T>) { + assert!(!ptr.is_null()); + + guard.defer_unchecked(move || { + atomic::fence(Ordering::Acquire); + mem::drop(ptr.into_owned()); + }); +} + +pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when copied into a new table +pub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyed +pub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table + +#[cfg(test)] +mod tests { + use super::*; + + use ahash::RandomState; + + #[test] + fn get_insert_remove() { + let build_hasher = RandomState::new(); + let buckets = BucketArray::with_capacity(0, 8); + let guard = unsafe { &crossbeam_epoch::unprotected() }; + + let k1 = "foo"; + let h1 = hash(&build_hasher, k1); + let v1 = 5; + + let k2 = "bar"; + let h2 = hash(&build_hasher, k2); + let v2 = 10; + + let k3 = "baz"; + let h3 = hash(&build_hasher, k3); + let v3 = 15; + + assert_eq!(buckets.get(guard, h1, k1), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h2, k2), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + let b1 = Owned::new(Bucket::new(k1, v1)).into_shared(guard); + assert!(is_ok_null( + buckets.insert(guard, h1, unsafe { b1.into_owned() }) + )); + + assert_eq!(buckets.get(guard, h1, k1), Ok(b1)); + assert_eq!(buckets.get(guard, h2, k2), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + let b2 = Owned::new(Bucket::new(k2, v2)).into_shared(guard); + assert!(is_ok_null( + buckets.insert(guard, h2, unsafe { b2.into_owned() }) + )); + + assert_eq!(buckets.get(guard, h1, k1), Ok(b1)); + assert_eq!(buckets.get(guard, h2, k2), Ok(b2)); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + let b3 = Owned::new(Bucket::new(k3, v3)).into_shared(guard); + assert!(is_ok_null( + buckets.insert(guard, h3, unsafe { b3.into_owned() }) + )); + + assert_eq!(buckets.get(guard, h1, k1), Ok(b1)); + assert_eq!(buckets.get(guard, h2, k2), Ok(b2)); + assert_eq!(buckets.get(guard, h3, k3), Ok(b3)); + + assert_eq!( + buckets.remove_if(guard, h1, k1, |_, _| true).ok().unwrap(), + b1.with_tag(TOMBSTONE_TAG) + ); + unsafe { defer_destroy_tombstone(guard, b1.with_tag(TOMBSTONE_TAG)) }; + assert_eq!( + buckets.remove_if(guard, h2, k2, |_, _| true).ok().unwrap(), + b2.with_tag(TOMBSTONE_TAG) + ); + unsafe { defer_destroy_tombstone(guard, b2.with_tag(TOMBSTONE_TAG)) }; + assert_eq!( + buckets.remove_if(guard, h3, k3, |_, _| true).ok().unwrap(), + b3.with_tag(TOMBSTONE_TAG) + ); + unsafe { defer_destroy_tombstone(guard, b3.with_tag(TOMBSTONE_TAG)) }; + + assert_eq!(buckets.get(guard, h1, k1), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h2, k2), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + for this_bucket in buckets.buckets.iter() { + let this_bucket_ptr = this_bucket.swap(Shared::null(), Ordering::Relaxed, guard); + + if this_bucket_ptr.is_null() { + continue; + } + + unsafe { + defer_destroy_bucket(guard, this_bucket_ptr); + } + } + } + + fn is_ok_null<'g, K, V, E>(maybe_bucket_ptr: Result>, E>) -> bool { + if let Ok(bucket_ptr) = maybe_bucket_ptr { + bucket_ptr.is_null() + } else { + false + } + } +} diff --git a/src/map/tests.rs b/src/map/tests.rs new file mode 100644 index 0000000..bec0ae9 --- /dev/null +++ b/src/map/tests.rs @@ -0,0 +1,903 @@ +// MIT License +// +// Copyright (c) 2020 Gregory Meyer +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation files +// (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of the Software, +// and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +mod util; + +use util::{DropNotifier, NoisyDropper}; + +use super::*; + +use std::{ + iter, + sync::{Arc, Barrier}, + thread::{self, JoinHandle}, +}; + +#[test] +fn insertion() { + const MAX_VALUE: i32 = 512; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, i), None); + + assert!(!map.is_empty()); + assert_eq!(map.len(), (i + 1) as usize); + + for j in 0..=i { + assert_eq!(map.get(&j), Some(j)); + assert_eq!(map.insert(j, j), Some(j)); + } + + for k in i + 1..MAX_VALUE { + assert_eq!(map.get(&k), None); + } + } +} + +#[test] +fn growth() { + const MAX_VALUE: i32 = 512; + + let map = HashMap::new(); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, i), None); + + assert!(!map.is_empty()); + assert_eq!(map.len(), (i + 1) as usize); + + for j in 0..=i { + assert_eq!(map.get(&j), Some(j)); + assert_eq!(map.insert(j, j), Some(j)); + } + + for k in i + 1..MAX_VALUE { + assert_eq!(map.get(&k), None); + } + } +} + +#[test] +fn concurrent_insertion() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = Arc::new(HashMap::with_capacity(MAX_INSERTED_VALUE as usize)); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn concurrent_growth() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = Arc::new(HashMap::new()); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(|t| t.join()) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn removal() { + const MAX_VALUE: i32 = 512; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, i), None); + } + + for i in 0..MAX_VALUE { + assert_eq!(map.remove(&i), Some(i)); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn concurrent_removal() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.insert(i, i), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.remove(&j), Some(j)); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(|t| t.join()) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), 0); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn concurrent_insertion_and_removal() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; + const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; + + let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.insert(i, i), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); + + let insert_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + let remove_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.remove(&j), Some(j)); + } + }) + }) + .collect(); + + for result in insert_threads + .into_iter() + .chain(remove_threads.into_iter()) + .map(|t| t.join()) + { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), INSERTED_MIDPOINT as usize); + + for i in 0..INSERTED_MIDPOINT { + assert_eq!(map.get(&i), Some(i)); + } + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn concurrent_growth_and_removal() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; + const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; + + let map = HashMap::with_capacity(INSERTED_MIDPOINT as usize); + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.insert(i, i), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); + + let insert_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + let remove_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.remove(&j), Some(j)); + } + }) + }) + .collect(); + + for result in insert_threads + .into_iter() + .chain(remove_threads.into_iter()) + .map(JoinHandle::join) + { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), INSERTED_MIDPOINT as usize); + + for i in 0..INSERTED_MIDPOINT { + assert_eq!(map.get(&i), Some(i)); + } + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn modify() { + let map = HashMap::new(); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + assert_eq!(map.modify("foo", |_, x| x * 2), None); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + map.insert("foo", 1); + assert_eq!(map.modify("foo", |_, x| x * 2), Some(1)); + + assert!(!map.is_empty()); + assert_eq!(map.len(), 1); + + map.remove("foo"); + assert_eq!(map.modify("foo", |_, x| x * 2), None); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); +} + +#[test] +fn concurrent_modification() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + map.insert(i, i); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (i as i32 * MAX_VALUE)..((i as i32 + 1) * MAX_VALUE) { + assert_eq!(map.modify(j, |_, x| x * 2), Some(j)); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), Some(i * 2)); + } +} + +#[test] +fn concurrent_overlapped_modification() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, 0), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for i in 0..MAX_VALUE { + assert!(map.modify(i, |_, x| x + 1).is_some()); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); + } +} + +#[test] +fn insert_or_modify() { + let map = HashMap::new(); + + assert_eq!(map.insert_or_modify("foo", 1, |_, x| x + 1), None); + assert_eq!(map.get("foo"), Some(1)); + + assert_eq!(map.insert_or_modify("foo", 1, |_, x| x + 1), Some(1)); + assert_eq!(map.get("foo"), Some(2)); +} + +#[test] +fn concurrent_insert_or_modify() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = Arc::new(HashMap::new()); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + map.insert_or_modify(j, 1, |_, x| x + 1); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); + } +} + +#[test] +fn concurrent_overlapped_insertion() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = Arc::new(HashMap::with_capacity(MAX_VALUE as usize)); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + map.insert(j, j); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn concurrent_overlapped_growth() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = Arc::new(HashMap::with_capacity(1)); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + map.insert(j, j); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn concurrent_overlapped_removal() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + map.insert(i, i); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + let prev_value = map.remove(&j); + + if let Some(v) = prev_value { + assert_eq!(v, j); + } + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn drop_value() { + let key_parent = Arc::new(DropNotifier::new()); + let value_parent = Arc::new(DropNotifier::new()); + + { + let map = HashMap::new(); + + assert_eq!( + map.insert_and( + NoisyDropper::new(key_parent.clone(), 0), + NoisyDropper::new(value_parent.clone(), 0), + |_| () + ), + None + ); + assert!(!map.is_empty()); + assert_eq!(map.len(), 1); + map.get_and(&0, |v| assert_eq!(v, &0)); + + map.remove_and(&0, |v| assert_eq!(v, &0)); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + assert_eq!(map.get_and(&0, |_| ()), None); + + util::run_deferred(); + + assert!(!key_parent.was_dropped()); + assert!(value_parent.was_dropped()); + } + + util::run_deferred(); + + assert!(key_parent.was_dropped()); + assert!(value_parent.was_dropped()); +} + +#[test] +fn drop_many_values() { + const NUM_VALUES: usize = 1 << 16; + + let key_parents: Vec<_> = iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(); + let value_parents: Vec<_> = iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(); + + { + let map = HashMap::new(); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + for (i, (this_key_parent, this_value_parent)) in + key_parents.iter().zip(value_parents.iter()).enumerate() + { + assert_eq!( + map.insert_and( + NoisyDropper::new(this_key_parent.clone(), i), + NoisyDropper::new(this_value_parent.clone(), i), + |_| () + ), + None + ); + + assert!(!map.is_empty()); + assert_eq!(map.len(), i + 1); + } + + for i in 0..NUM_VALUES { + assert_eq!( + map.get_key_value_and(&i, |k, v| { + assert_eq!(*k, i); + assert_eq!(*v, i); + }), + Some(()) + ); + } + + for i in 0..NUM_VALUES { + assert_eq!( + map.remove_entry_and(&i, |k, v| { + assert_eq!(*k, i); + assert_eq!(*v, i); + }), + Some(()) + ); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(!this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(this_value_parent.was_dropped()); + } + + for i in 0..NUM_VALUES { + assert_eq!(map.get_and(&i, |_| ()), None); + } + } + + util::run_deferred(); + + for this_key_parent in key_parents.into_iter() { + assert!(this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.into_iter() { + assert!(this_value_parent.was_dropped()); + } +} + +#[test] +fn drop_many_values_concurrent() { + const NUM_THREADS: usize = 64; + const NUM_VALUES_PER_THREAD: usize = 512; + const NUM_VALUES: usize = NUM_THREADS * NUM_VALUES_PER_THREAD; + + let key_parents: Arc> = Arc::new( + iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(), + ); + let value_parents: Arc> = Arc::new( + iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(), + ); + + { + let map = Arc::new(HashMap::new()); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let handles: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = Arc::clone(&map); + let barrier = Arc::clone(&barrier); + let key_parents = Arc::clone(&key_parents); + let value_parents = Arc::clone(&value_parents); + + thread::spawn(move || { + barrier.wait(); + + let these_key_parents = + &key_parents[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD]; + let these_value_parents = + &value_parents[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD]; + + for (j, (this_key_parent, this_value_parent)) in these_key_parents + .iter() + .zip(these_value_parents.iter()) + .enumerate() + { + let key_value = i * NUM_VALUES_PER_THREAD + j; + + assert_eq!( + map.insert_and( + NoisyDropper::new(this_key_parent.clone(), key_value), + NoisyDropper::new(this_value_parent.clone(), key_value), + |_| () + ), + None + ); + } + }) + }) + .collect(); + + for result in handles.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), NUM_VALUES); + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(!this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(!this_value_parent.was_dropped()); + } + + for i in 0..NUM_VALUES { + assert_eq!( + map.get_key_value_and(&i, |k, v| { + assert_eq!(*k, i); + assert_eq!(*v, i); + }), + Some(()) + ); + } + + let handles: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = Arc::clone(&map); + let barrier = Arc::clone(&barrier); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..NUM_VALUES_PER_THREAD { + let key_value = i * NUM_VALUES_PER_THREAD + j; + + assert_eq!( + map.remove_entry_and(&key_value, |k, v| { + assert_eq!(*k, key_value); + assert_eq!(*v, key_value); + }), + Some(()) + ); + } + }) + }) + .collect(); + + for result in handles.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(!this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(this_value_parent.was_dropped()); + } + + for i in 0..NUM_VALUES { + assert_eq!(map.get_and(&i, |_| ()), None); + } + } + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(this_value_parent.was_dropped()); + } +} + +#[test] +fn remove_if() { + const NUM_VALUES: usize = 512; + + let is_even = |_: &usize, v: &usize| *v % 2 == 0; + + let map = HashMap::new(); + + for i in 0..NUM_VALUES { + assert_eq!(map.insert(i, i), None); + } + + for i in 0..NUM_VALUES { + if is_even(&i, &i) { + assert_eq!(map.remove_if(&i, is_even), Some(i)); + } else { + assert_eq!(map.remove_if(&i, is_even), None); + } + } + + for i in (0..NUM_VALUES).filter(|i| i % 2 == 0) { + assert_eq!(map.get(&i), None); + } + + for i in (0..NUM_VALUES).filter(|i| i % 2 != 0) { + assert_eq!(map.get(&i), Some(i)); + } +} diff --git a/src/map/tests/util.rs b/src/map/tests/util.rs new file mode 100644 index 0000000..ad5af00 --- /dev/null +++ b/src/map/tests/util.rs @@ -0,0 +1,132 @@ +// MIT License +// +// Copyright (c) 2020 Gregory Meyer +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation files +// (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of the Software, +// and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +use std::{ + borrow::{Borrow, BorrowMut}, + hash::{Hash, Hasher}, + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +#[derive(Debug)] +pub(crate) struct NoisyDropper { + parent: Arc, + pub elem: T, +} + +impl NoisyDropper { + pub(crate) fn new(parent: Arc, elem: T) -> Self { + Self { parent, elem } + } +} + +impl Drop for NoisyDropper { + fn drop(&mut self) { + assert_eq!(self.parent.dropped.swap(true, Ordering::Relaxed), false); + } +} + +impl PartialEq for NoisyDropper { + fn eq(&self, other: &Self) -> bool { + self.elem == other.elem + } +} + +impl PartialEq for NoisyDropper { + fn eq(&self, other: &T) -> bool { + &self.elem == other + } +} + +impl Eq for NoisyDropper {} + +impl Hash for NoisyDropper { + fn hash(&self, hasher: &mut H) { + self.elem.hash(hasher); + } +} + +impl AsRef for NoisyDropper { + fn as_ref(&self) -> &T { + &self.elem + } +} + +impl AsMut for NoisyDropper { + fn as_mut(&mut self) -> &mut T { + &mut self.elem + } +} + +impl Borrow for NoisyDropper { + fn borrow(&self) -> &T { + &self.elem + } +} + +impl BorrowMut for NoisyDropper { + fn borrow_mut(&mut self) -> &mut T { + &mut self.elem + } +} + +impl Deref for NoisyDropper { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.elem + } +} + +impl DerefMut for NoisyDropper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.elem + } +} + +#[derive(Debug)] +pub(crate) struct DropNotifier { + dropped: AtomicBool, +} + +impl DropNotifier { + pub(crate) fn new() -> Self { + Self { + dropped: AtomicBool::new(false), + } + } + + pub(crate) fn was_dropped(&self) -> bool { + self.dropped.load(Ordering::Relaxed) + } +} + +pub(crate) fn run_deferred() { + for _ in 0..65536 { + crossbeam_epoch::pin().flush(); + } +}