diff --git a/Cargo.toml b/Cargo.toml index bb82f60..98263b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cht" -version = "0.2.0" +version = "0.3.0" authors = ["Gregory Meyer "] edition = "2018" description = "Lockfree resizeable concurrent hash table." diff --git a/README.md b/README.md index 1245ef6..f73ac32 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ and deletions. In your `Cargo.toml`: ```toml -cht = "^0.2.0" +cht = "^0.3.0" ``` Then in your code: diff --git a/src/lib.rs b/src/lib.rs index 9eeaa9a..a8a8925 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ // MIT License // -// Copyright (c) 2019 Gregory Meyer +// Copyright (c) 2020 Gregory Meyer // // Permission is hereby granted, free of charge, to any person // obtaining a copy of this software and associated documentation files @@ -34,594 +34,3 @@ pub mod map; pub use map::HashMap; - -#[cfg(test)] -mod tests { - use super::*; - - use std::{ - sync::{Arc, Barrier}, - thread::{self, JoinHandle}, - }; - - #[test] - fn hash_map_insertion() { - const MAX_VALUE: i32 = 512; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, i), None); - - assert!(!map.is_empty()); - assert_eq!(map.len(), (i + 1) as usize); - - for j in 0..=i { - assert_eq!(map.get(&j), Some(j)); - assert_eq!(map.insert(j, j), Some(j)); - } - - for k in i + 1..MAX_VALUE { - assert_eq!(map.get(&k), None); - } - } - } - - #[test] - fn hash_map_growth() { - const MAX_VALUE: i32 = 512; - - let map = HashMap::new(); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, i), None); - - assert!(!map.is_empty()); - assert_eq!(map.len(), (i + 1) as usize); - - for j in 0..=i { - assert_eq!(map.get(&j), Some(j)); - assert_eq!(map.insert(j, j), Some(j)); - } - - for k in i + 1..MAX_VALUE { - assert_eq!(map.get(&k), None); - } - } - } - - #[test] - fn hash_map_concurrent_insertion() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = Arc::new(HashMap::with_capacity(MAX_INSERTED_VALUE as usize)); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_concurrent_growth() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = Arc::new(HashMap::new()); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(|t| t.join()) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_removal() { - const MAX_VALUE: i32 = 512; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, i), None); - } - - for i in 0..MAX_VALUE { - assert_eq!(map.remove(&i), Some(i)); - } - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_concurrent_removal() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.insert(i, i), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.remove(&j), Some(j)); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(|t| t.join()) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), 0); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_concurrent_insertion_and_removal() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; - const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; - - let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.insert(i, i), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); - - let insert_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - let remove_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) - { - assert_eq!(map.remove(&j), Some(j)); - } - }) - }) - .collect(); - - for result in insert_threads - .into_iter() - .chain(remove_threads.into_iter()) - .map(|t| t.join()) - { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), INSERTED_MIDPOINT as usize); - - for i in 0..INSERTED_MIDPOINT { - assert_eq!(map.get(&i), Some(i)); - } - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_concurrent_growth_and_removal() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; - const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; - - let map = HashMap::with_capacity(INSERTED_MIDPOINT as usize); - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.insert(i, i), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); - - let insert_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { - assert_eq!(map.insert(j, j), None); - } - }) - }) - .collect(); - - let remove_threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) - { - assert_eq!(map.remove(&j), Some(j)); - } - }) - }) - .collect(); - - for result in insert_threads - .into_iter() - .chain(remove_threads.into_iter()) - .map(JoinHandle::join) - { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), INSERTED_MIDPOINT as usize); - - for i in 0..INSERTED_MIDPOINT { - assert_eq!(map.get(&i), Some(i)); - } - - for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), None); - } - } - - #[test] - fn hash_map_modify() { - let map = HashMap::new(); - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - assert_eq!(map.modify("foo", |x| x * 2), None); - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - map.insert("foo", 1); - assert_eq!(map.modify("foo", |x| x * 2), Some(1)); - - assert!(!map.is_empty()); - assert_eq!(map.len(), 1); - - map.remove("foo"); - assert_eq!(map.modify("foo", |x| x * 2), None); - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - } - - #[test] - fn hash_map_concurrent_modification() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; - - let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - map.insert(i, i); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|i| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in (i as i32 * MAX_VALUE)..((i as i32 + 1) * MAX_VALUE) { - assert_eq!(map.modify(&j, |x| x * 2), Some(j)); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); - - for i in 0..MAX_INSERTED_VALUE { - assert_eq!(map.get(&i), Some(i * 2)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_modification() { - const MAX_VALUE: i32 = 512; - const NUM_THREADS: usize = 64; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.insert(i, 0), None); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for i in 0..MAX_VALUE { - assert!(map.modify(&i, |x| x + 1).is_some()); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(!map.is_empty()); - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); - } - } - - #[test] - fn hash_map_insert_or_modify() { - let map = HashMap::new(); - - assert_eq!(map.insert_or_modify("foo", 1, |x| x + 1), None); - assert_eq!(map.get("foo"), Some(1)); - - assert_eq!(map.insert_or_modify("foo", 1, |x| x + 1), Some(1)); - assert_eq!(map.get("foo"), Some(2)); - } - - #[test] - fn hash_map_concurrent_insert_or_modify() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = Arc::new(HashMap::new()); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - map.insert_or_modify(j, 1, |x| x + 1); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_insertion() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = Arc::new(HashMap::with_capacity(MAX_VALUE as usize)); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - map.insert(j, j); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_growth() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = Arc::new(HashMap::new()); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - map.insert(j, j); - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert_eq!(map.len(), MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), Some(i)); - } - } - - #[test] - fn hash_map_concurrent_overlapped_removal() { - const NUM_THREADS: usize = 64; - const MAX_VALUE: i32 = 512; - - let map = HashMap::with_capacity(MAX_VALUE as usize); - - for i in 0..MAX_VALUE { - map.insert(i, i); - } - - let map = Arc::new(map); - let barrier = Arc::new(Barrier::new(NUM_THREADS)); - - let threads: Vec<_> = (0..NUM_THREADS) - .map(|_| { - let map = map.clone(); - let barrier = barrier.clone(); - - thread::spawn(move || { - barrier.wait(); - - for j in 0..MAX_VALUE { - let prev_value = map.remove(&j); - - if let Some(v) = prev_value { - assert_eq!(v, j); - } - } - }) - }) - .collect(); - - for result in threads.into_iter().map(JoinHandle::join) { - assert!(result.is_ok()); - } - - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - - for i in 0..MAX_VALUE { - assert_eq!(map.get(&i), None); - } - } -} diff --git a/src/map.rs b/src/map.rs index 3e407fd..e0e7118 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,6 +1,6 @@ // MIT License // -// Copyright (c) 2019 Gregory Meyer +// Copyright (c) 2020 Gregory Meyer // // Permission is hereby granted, free of charge, to any person // obtaining a copy of this software and associated documentation files @@ -25,18 +25,21 @@ //! A lockfree concurrent hash map implemented with open addressing and linear //! probing. +mod bucket; + +#[cfg(test)] +mod tests; + +use bucket::{Bucket, BucketArray, InsertOrModifyState, KeyOrOwnedBucket}; + use std::{ borrow::Borrow, - hash::{BuildHasher, Hash, Hasher}, - mem::{self}, - sync::{ - atomic::{self, AtomicUsize, Ordering}, - Arc, - }, + hash::{BuildHasher, Hash}, + sync::atomic::{self, AtomicUsize, Ordering}, }; use ahash::RandomState; -use crossbeam_epoch::{self, Atomic, Guard, Owned, Shared}; +use crossbeam_epoch::{self, Atomic, CompareAndSetError, Guard, Owned, Shared}; /// Default hasher for `HashMap`. /// @@ -57,21 +60,18 @@ pub type DefaultHashBuilder = RandomState; /// The default hashing algorithm is [aHash], a hashing algorithm that is /// accelerated by the [AES-NI] instruction set on x86 proessors. aHash provides /// some resistance to DoS attacks, but will not provide the same level of -/// resistance as something like [`RandomState`] from the standard library. +/// resistance as something like [`RandomState`]. /// /// The hashing algorithm to be used can be chosen on a per-`HashMap` basis /// using the [`with_hasher`] and [`with_capacity_and_hasher`] methods. /// -/// Key types must implement [`Hash`] and [`Eq`]. Additionally, if you are going -/// to be removing elements from this `HashMap`, the key type must also -/// implement [`Clone`], as `HashMap` uses tombstones for deletion. Any -/// operations that return a value require the value type to implement -/// [`Clone`], as elements may be in use by other threads and as such cannot be -/// moved from. +/// Key types must implement [`Hash`] and [`Eq`]. Any operations that return a +/// key or value require the return types to implement [`Clone`], as elements +/// may be in use by other threads and as such cannot be moved from. /// /// `HashMap` is inspired by Jeff Phreshing's hash tables implemented in /// [Junction], described in [this blog post]. In short, `HashMap` supports -/// fully concurrent lookups, insertions, and removals. +/// fully concurrent lookups, insertions, removals, and updates. /// /// [aHash]: https://docs.rs/ahash /// [AES-NI]: https://en.wikipedia.org/wiki/AES_instruction_set @@ -85,17 +85,16 @@ pub type DefaultHashBuilder = RandomState; /// [this blog post]: https://preshing.com/20160222/a-resizable-concurrent-map/ #[derive(Default)] pub struct HashMap { - buckets: Atomic>, + bucket_array: Atomic>, + build_hasher: S, len: AtomicUsize, - hash_builder: Arc, } impl HashMap { /// Creates an empty `HashMap`. /// /// The hash map is created with a capacity of 0 and will not allocate any - /// space for elements until the first insertion. However, the hash builder - /// `S` will be allocated on the heap. + /// space for elements until the first insertion. pub fn new() -> HashMap { HashMap::with_capacity_and_hasher(0, DefaultHashBuilder::default()) } @@ -103,48 +102,36 @@ impl HashMap { /// Creates an empty `HashMap` with space for at least `capacity` elements /// without reallocating. /// - /// If `capacity == 0`, the hash map will not allocate any space for - /// elements, but it will allocate space for the hash builder. + /// If `capacity == 0`, no allocations will occur. pub fn with_capacity(capacity: usize) -> HashMap { HashMap::with_capacity_and_hasher(capacity, DefaultHashBuilder::default()) } } impl HashMap { - /// Creates an empty `HashMap` that will use `hash_builder` to hash keys. + /// Creates an empty `HashMap` that will use `build_hasher` to hash keys. /// /// The created map will have a capacity of 0 and as such will not have any - /// space for elements allocated until the first insertion. However, the - /// hash builder `S` will be allocated on the heap. - pub fn with_hasher(hash_builder: S) -> HashMap { - HashMap::with_capacity_and_hasher(0, hash_builder) + /// space for elements allocated until the first insertion. + pub fn with_hasher(build_hasher: S) -> HashMap { + HashMap::with_capacity_and_hasher(0, build_hasher) } /// Creates an empty `HashMap` that will hold at least `capacity` elements - /// without reallocating and that uses `hash_builder` to hash keys. - /// - /// If `capacity == 0`, the hash map will not allocate any space for - /// elements. However, the hash map will always allocate its hash builder - /// `S` on the heap. - pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> HashMap { - let hash_builder = Arc::new(hash_builder); - - if capacity == 0 { - HashMap { - buckets: Atomic::null(), - hash_builder, - len: AtomicUsize::new(0), - } + /// without reallocating and that uses `build_hasher` to hash keys. + /// + /// If `capacity == 0`, no allocations will occur. + pub fn with_capacity_and_hasher(capacity: usize, build_hasher: S) -> HashMap { + let bucket_array = if capacity == 0 { + Atomic::null() } else { - HashMap { - buckets: Atomic::new(BucketArray::with_capacity_hasher_and_epoch( - capacity + 1, - hash_builder.clone(), - 0, - )), - hash_builder, - len: AtomicUsize::new(0), - } + Atomic::new(BucketArray::with_capacity(0, capacity)) + }; + + Self { + bucket_array, + build_hasher, + len: AtomicUsize::new(0), } } @@ -166,35 +153,29 @@ impl HashMap { } /// Returns the number of elements this `HashMap` can hold without - /// reallocating. + /// reallocating a table. + /// + /// Note that all mutating operations, with the exception of removing + /// elements, incur at least one allocation for the associated bucket. /// - /// If invoked while this hash map is growing, it is possible for - /// [`len`](#method.len) to return a greater value than this function does. - /// This is because new elements are being inserted into the next array of - /// buckets, but the `HashMap`'s bucket pointer has not been swung up the - /// list yet. + /// If there are insertion operations in flight, it is possible that a + /// new, larger table has already been allocated. pub fn capacity(&self) -> usize { let guard = &crossbeam_epoch::pin(); - let buckets_ptr = self.buckets.load_consume(guard); - - if buckets_ptr.is_null() { - return 0; - } - let buckets_ref = unsafe { buckets_ptr.deref() }; + let bucket_array_ptr = self.bucket_array.load_consume(guard); - buckets_ref.buckets.len() / 2 + unsafe { bucket_array_ptr.as_ref() } + .map(BucketArray::capacity) + .unwrap_or(0) } /// Returns a copy of the value corresponding to `key`. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. In addition, `V` must implement [`Clone`], as - /// the value may be concurrently removed at any moment, so the best we can - /// do is return a copy of it. - /// - /// If your `V` does not implement [`Clone`], you will have to use - /// [`get_and`] instead. + /// *must* match that of `K`. `V` must implement [`Clone`], as the value may + /// be deleted at any moment; the best we can do is to clone them while we + /// know they exist. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html @@ -205,18 +186,15 @@ impl HashMap { K: Borrow, V: Clone, { - self.get_and(key, V::clone) + self.get_key_value_and(key, |_, v| v.clone()) } /// Returns a copy of the key and value corresponding to `key`. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. In addition, `K` and `V` must implement - /// [`Clone`], as the bucket may be concurrently removed at any moment, so - /// the best we can do is return a copy of it. - /// - /// If your `K` or `V` do not implement [`Clone`], you will have to use - /// [`get_key_value_and`] instead. + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as the + /// bucket may be concurrently removed at any time; the best we can do is to + /// clone them while we know they exist. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html @@ -230,10 +208,10 @@ impl HashMap { self.get_key_value_and(key, |k, v| (k.clone(), v.clone())) } - /// Invokes `func` with a reference to the value corresponding to `key`. + /// Invokes `with_value` with a reference to the value corresponding to `key`. /// - /// `func` will only be invoked if there is a value associated with `key` - /// contained within this hash map. + /// `with_value` will only be invoked if there is a value associated with + /// `key` contained within this hash map. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` /// *must* match that of `K`. @@ -243,18 +221,18 @@ impl HashMap { pub fn get_and T, T>( &self, key: &Q, - func: F, + with_value: F, ) -> Option where K: Borrow, { - self.get_key_value_and(key, move |_, v| func(v)) + self.get_key_value_and(key, move |_, v| with_value(v)) } - /// Invokes `func` with a reference to the key and value corresponding to - /// `key`. + /// Invokes `with_entry` with a reference to the key and value corresponding + /// to `key`. /// - /// `func` will only be invoked if there is a value associated with `key` + /// `with_entry` will only be invoked if there is a value associated with `key` /// contained within this hash map. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` @@ -265,30 +243,55 @@ impl HashMap { pub fn get_key_value_and T, T>( &self, key: &Q, - func: F, + with_entry: F, ) -> Option where K: Borrow, { let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, key); + + let result; - self.get_bucket(key, guard) - .and_then(move |b| match &b.maybe_value { - Some(v) => Some(func(&b.key, v)), - None => None, - }) + loop { + match bucket_array_ref + .get(guard, hash, key) + .map(|p| unsafe { p.as_ref() }) + { + Ok(Some(Bucket { + key, + maybe_value: value, + })) => { + result = Some(with_entry(key, unsafe { &*value.as_ptr() })); + + break; + } + Ok(None) => { + result = None; + + break; + } + Err(_) => { + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } + } + } + + self.swing_bucket_array(guard, current_ref, bucket_array_ref); + + result } - /// Inserts a key-value pair into the hash map, then returns a copy of the - /// previous value associated with `key`. + /// Inserts a key-value pair, then returns a copy of the value previously + /// associated with `key`. /// /// If the key was not previously present in this hash map, [`None`] is /// returned. /// - /// `V` must implement [`Clone`] for this function, as it is possible that - /// other threads may still hold references to the value previously - /// associated with `key`. As such, the associated value cannot be moved - /// from. + /// `V` must implement [`Clone`], as other threads may hold references to + /// the associated value. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html @@ -296,18 +299,16 @@ impl HashMap { where V: Clone, { - self.insert_and(key, value, V::clone) + self.insert_entry_and(key, value, |_, v| v.clone()) } - /// Inserts a key-value pair into the hash map, then returns a copy of the - /// previous key-value pair. + /// Inserts a key-value pair, then returns a copy of the previous entry. /// /// If the key was not previously present in this hash map, [`None`] is /// returned. /// - /// `K` and `V` must implement [`Clone`] for this function, as it is - /// possible that other threads may still hold references to the key-value - /// pair previously associated with `key`. + /// `K` and `V` must implement [`Clone`], as other threads may hold + /// references to the entry. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html @@ -319,194 +320,88 @@ impl HashMap { self.insert_entry_and(key, value, |k, v| (k.clone(), v.clone())) } - /// Inserts a key-value pair into the hash map, then invokes `func` with the - /// previously-associated value. - /// - /// If the key was not previously present in this hash map, [`None`] is - /// returned and `func` is not invoked. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_and T, T>(&self, key: K, value: V, func: F) -> Option { - self.insert_entry_and(key, value, move |_, v| func(v)) - } - - /// Inserts a key-value pair into the hash map, then invokes `func` with the - /// new key and previously-associated value. + /// Inserts a key-value pair, then invokes `with_previous_value` with the + /// value previously associated with `key`. /// /// If the key was not previously present in this hash map, [`None`] is - /// returned and `func` is not invoked. + /// returned and `with_previous_value` is not invoked. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_entry_and T, T>( + pub fn insert_and T, T>( &self, key: K, value: V, - func: F, + with_previous_value: F, ) -> Option { - let guard = &crossbeam_epoch::pin(); - - self.do_insert(key, value, guard).and_then(|bucket| { - bucket - .maybe_value - .as_ref() - .map(|previous_value| func(&bucket.key, previous_value)) - }) - } - - /// Insert the a new value if none is associated with `key` or replace the - /// value with the result of `on_modify`, then return a clone of the old - /// value. - /// - /// If there is no value associated with `key`, [`None`] will be returned - /// and `on_modify` will not be invoked. Otherwise, `on_modify` may be - /// invoked multiple times depending on how much write contention there is - /// on the bucket associated with `key`. - /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. - /// - /// `V` must implement [`Clone`] for this function, as it is possible that - /// other threads may still hold references to the value previously - /// associated with `key`. As such, the value previously associated with - /// `key` cannot be moved from. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html - pub fn insert_or_modify V>(&self, key: K, value: V, on_modify: F) -> Option - where - V: Clone, - { - self.insert_or_modify_and(key, value, on_modify, V::clone) - } - - /// Insert the result of `on_insert` if no value is associated with `key` or - /// replace the value with the result of `on_modify`, then return a clone of - /// the old value. - /// - /// If there is no value associated with `key`, `on_insert` will be invoked, - /// `on_modify` will not be invoked, and [`None`] will be returned. - /// Otherwise, `on_modify` may be invoked multiple times depending on how - /// much write contention there is on the bucket associate with `key`. - /// - /// It is possible for both `on_insert` and `on_modify` to be invoked, even - /// if [`None`] is returned, if other threads are also writing to the bucket - /// associated with `key`. - /// - /// `V` must implement [`Clone`] for this function, as it is possible that - /// other threads may still hold references to the value previously - /// associated with `key`. As such, the value previously associated with - /// `key` cannot be moved from. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html - pub fn insert_with_or_modify V, G: FnMut(&V) -> V>( - &self, - key: K, - on_insert: F, - on_modify: G, - ) -> Option - where - V: Clone, - { - self.insert_with_or_modify_and(key, on_insert, on_modify, V::clone) + self.insert_entry_and(key, value, move |_, v| with_previous_value(v)) } - /// Insert the a new value if none is associated with `key` or replace the - /// value with the result of `on_modify`, then return the result of - /// `with_old_value` using the old value. + /// Inserts a key-value pair, then invokes `with_previous_entry` with the + /// previous entry. /// - /// If there is no value associated with `key`, [`None`] will be returned - /// and `on_modify` and `with_old_value` will not be invoked. Otherwise, - /// `on_modify` may be invoked multiple times depending on how much write - /// contention there is on the bucket associated with `key`. - /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. + /// If the key was not previously present in this hash map, [`None`] is + /// returned and `with_previous_entry` is not invoked. /// /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_or_modify_and V, G: FnOnce(&V) -> T, T>( + pub fn insert_entry_and T, T>( &self, key: K, value: V, - on_modify: F, - with_old_value: G, - ) -> Option { - self.insert_with_or_modify_and(key, move || value, on_modify, with_old_value) - } - - /// Insert the result of `on_insert` if no value is associated with `key` or - /// replace the value with the result of `on_modify`, then return the result - /// of `with_old_value` using the old value. - /// - /// If there is no value associated with `key`, `on_insert` will be invoked, - /// `on_modify` and `with_old_value`, will not be invoked, and [`None`] will - /// be returned. Otherwise, `on_modify` may be invoked multiple times - /// depending on how much write contention there is on the bucket associate - /// with `key`. - /// - /// It is possible for both `on_insert` and `on_modify` to be invoked, even - /// if [`None`] is returned, if other threads are also writing to the bucket - /// associated with `key`. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - pub fn insert_with_or_modify_and V, G: FnMut(&V) -> V, H: FnOnce(&V) -> T, T>( - &self, - key: K, - on_insert: F, - on_modify: G, - with_old_value: H, + with_previous_entry: F, ) -> Option { let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); + let mut bucket_ptr = Owned::new(Bucket::new(key, value)); - let hash = self.get_hash(&key); + let result; - let buckets_ptr = self.get_or_create_buckets(guard); - assert!(!buckets_ptr.is_null()); + loop { + while self.len.load(Ordering::Relaxed) > bucket_array_ref.capacity() { + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } - let buckets = unsafe { buckets_ptr.deref() }; + match bucket_array_ref.insert(guard, hash, bucket_ptr) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 { + self.len.fetch_add(1, Ordering::Relaxed); + result = None; + } else { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() })); + } - let BucketAndParent { - bucket: previous_bucket_ptr, - parent: new_buckets_ptr, - } = buckets.insert_or_modify( - KeyOrBucket::Key(key), - hash, - FunctionOrValue::Function(on_insert), - on_modify, - guard, - ); + unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) }; + } else { + self.len.fetch_add(1, Ordering::Relaxed); + result = None; + } - if new_buckets_ptr != buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); + break; + } + Err(p) => { + bucket_ptr = p; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } + } } - if !previous_bucket_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(previous_bucket_ptr.into_owned()); - }) - }; - } else { - self.len.fetch_add(1, Ordering::Relaxed); - } + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - unsafe { previous_bucket_ptr.as_ref() } - .and_then(move |b| b.maybe_value.as_ref().map(with_old_value)) + result } -} -impl HashMap { - /// Removes the value associated with `key` from the hash map, returning a - /// copy of that value if there was one contained in this hash map. + /// If there is a value associated with `key`, remove and return a copy of + /// it. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` and `V` must implement [`Clone`] for this - /// function, as `K` must be cloned for the tombstone bucket and the - /// previously-associated value cannot be moved from, as other threads - /// may still hold references to it. + /// *must* match that of `K`. `V` must implement [`Clone`], as other + /// threads may hold references to the associated value. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html @@ -516,1019 +411,684 @@ impl HashMap { K: Borrow, V: Clone, { - self.remove_and(key, V::clone) + self.remove_entry_if_and(key, |_, _| true, |_, v| v.clone()) } - /// Removes the value associated with `key` from the hash map, returning a - /// copy of that key-value pair it was contained in this hash map. + /// If there is a value associated with `key`, remove it and return a copy + /// of the previous entity. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` and `V` must implement [`Clone`] for this - /// function. `K` must be cloned twice: once for the tombstone bucket - /// and once for the return value; the previously-associated value cannot be - /// moved from, as other threads may still hold references to it. + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html pub fn remove_entry(&self, key: &Q) -> Option<(K, V)> where - K: Borrow, + K: Borrow + Clone, V: Clone, { - self.remove_entry_and(key, |k, v| (k.clone(), v.clone())) + self.remove_entry_if_and(key, |_, _| true, |k, v| (k.clone(), v.clone())) } - /// Removes the value associated with `key` from the hash map, then returns - /// the result of invoking `func` with the previously-associated value. + /// If there is a value associated with `key`, remove it and return the + /// result of invoking `with_previous_value` with that value. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` must implement [`Clone`] for this - /// function, as `K` must be cloned to create a tombstone bucket. + /// *must* match that of `K`. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html pub fn remove_and T, T>( &self, key: &Q, - func: F, + with_previous_value: F, ) -> Option where K: Borrow, { - self.remove_entry_and(key, move |_, v| func(v)) + self.remove_entry_if_and(key, |_, _| true, move |_, v| with_previous_value(v)) } - /// Removes the value associated with `key` from the hash map, then returns - /// the result of invoking `func` with the key and previously-associated - /// value. + /// If there is a value associated with `key`, remove it and return the + /// result of invoking `with_previous_entry` with that entry. /// /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` - /// *must* match that of `K`. `K` must implement [`Clone`] for this - /// function, as `K` must be cloned to create a tombstone bucket. + /// *must* match that of `K`. /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html pub fn remove_entry_and T, T>( &self, key: &Q, - func: F, + with_previous_entry: F, ) -> Option where K: Borrow, { - let guard = &crossbeam_epoch::pin(); - - self.do_remove(key, guard) - .and_then(|bucket| bucket.maybe_value.as_ref().map(|v| func(&bucket.key, v))) + self.remove_entry_if_and(key, |_, _| true, with_previous_entry) } - /// Replace the value associated with `key` with the result of `on_modify` - /// and return a clone of the previous value. - /// - /// If there is no value associated with `key`, `on_modify` will not be - /// invoked and [`None`] will be returned. Otherwise, `on_modify` may be - /// invoked multiple times depending on how much write contention there is - /// on the bucket associate with `key`. + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove and return a copy of its + /// value. /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. + /// `condition` may be invoked one or more times, even if no entry was + /// removed. /// - /// `K` must implement [`Clone`] for this function to create a new bucket - /// if one already exists. `V` must implement [`Clone`] for this function, - /// as it is possible that other threads may still hold references to the - /// value previously associated with `key`. As such, the value previously - /// associated with `key` cannot be moved from. `Q` can be any borrowed form - /// of `K`, but [`Hash`] and [`Eq`] on `Q` *must* match that of `K`. + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - pub fn modify V>( + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn remove_if bool>( &self, key: &Q, - on_modify: F, + condition: F, ) -> Option where K: Borrow, V: Clone, { - self.modify_and(key, on_modify, V::clone) + self.remove_entry_if_and(key, condition, move |_, v| v.clone()) } - /// Replace the value associated with `key` with the result of `on_modify` - /// and return the result of invoking `with_old_value` on the old value. + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove and return a copy of it. /// - /// If there is no value associated with `key`, `on_modify` and - /// `with_old_value` will not be invoked and [`None`] will be returned. - /// Otherwise, `on_modify` may be invoked multiple times depending on how - /// much write contention there is on the bucket associate with `key`. + /// `condition` may be invoked one or more times, even if no entry was + /// removed. /// - /// It is possible for `on_modify` to be invoked even if [`None`] is - /// returned if other threads are also writing to the bucket associated with - /// `key`. - /// - /// `K` must implement [`Clone`] for this function to create a new bucket - /// if one already exists. `Q` can be any borrowed form of `K`, but - /// [`Hash`] and [`Eq`] on `Q` *must* match that of `K`. + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn remove_entry_if bool>( + &self, + key: &Q, + condition: F, + ) -> Option<(K, V)> + where + K: Clone + Borrow, + V: Clone, + { + self.remove_entry_if_and(key, condition, move |k, v| (k.clone(), v.clone())) + } + + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove it and return the result of + /// invoking `with_previous_value` with its value. + /// + /// `condition` may be invoked one or more times, even if no entry was + /// removed. If `condition` failed or there was no value associated with + /// `key`, `with_previous_entry` is not invoked and [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html - pub fn modify_and V, G: FnOnce(&V) -> T, T>( + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn remove_if_and bool, G: FnOnce(&V) -> T, T>( &self, key: &Q, - on_modify: F, - with_old_value: G, + condition: F, + with_previous_value: G, ) -> Option where K: Borrow, { - let guard = &crossbeam_epoch::pin(); - - self.do_modify(key, on_modify, guard) - .and_then(move |b| b.maybe_value.as_ref().map(with_old_value)) + self.remove_entry_if_and(key, condition, move |_, v| with_previous_value(v)) } -} -impl<'g, K: Hash + Eq, V, S: 'g + BuildHasher> HashMap { - fn get_bucket( + /// If there is a value associated with `key` and `condition` returns true + /// when invoked with the current entry, remove it and return the result of + /// invoking `with_previous_entry` with it. + /// + /// `condition` may be invoked one or more times, even if no entry was + /// removed. If `condition` failed or there was no value associated with + /// `key`, `with_previous_entry` is not invoked and [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn remove_entry_if_and< + Q: Hash + Eq + ?Sized, + F: FnMut(&K, &V) -> bool, + G: FnOnce(&K, &V) -> T, + T, + >( &self, key: &Q, - guard: &'g Guard, - ) -> Option<&'g Bucket> + mut condition: F, + with_previous_entry: G, + ) -> Option where K: Borrow, { - let hash = self.get_hash(&key); + let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); - let buckets_ptr = self.buckets.load_consume(guard); + let result; - if buckets_ptr.is_null() { - return None; - } + loop { + match bucket_array_ref.remove_if(guard, hash, key, condition) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + self.len.fetch_sub(1, Ordering::Relaxed); + result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() })); - let buckets = unsafe { buckets_ptr.deref() }; - let (found_bucket_ptr, new_buckets_ptr) = buckets.get(key, hash, guard); + unsafe { bucket::defer_destroy_tombstone(guard, previous_bucket_ptr) }; + } else { + result = None; + } - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); + break; + } + Err(c) => { + condition = c; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } + } } - if let Some(found_bucket) = unsafe { found_bucket_ptr.as_ref() } { - assert!(found_bucket.key.borrow() == key); + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - Some(found_bucket) - } else { - None - } + result } - fn do_insert(&self, key: K, value: V, guard: &'g Guard) -> Option<&'g Bucket> { - let hash = self.get_hash(&key); - - let buckets_ptr = self.get_or_create_buckets(guard); - assert!(!buckets_ptr.is_null()); - let buckets = unsafe { buckets_ptr.deref() }; - - let new_bucket = Owned::new(Bucket { - key, - maybe_value: Some(value), - }) - .into_shared(guard); - - let (previous_bucket_ptr, new_buckets_ptr) = buckets.insert(new_bucket, hash, guard); - - // increment length if we replaced a null or tombstone bucket - if unsafe { previous_bucket_ptr.as_ref() } - .map(|b| b.maybe_value.is_none()) - .unwrap_or(true) - { - self.len.fetch_add(1, Ordering::Relaxed); - } - - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); - } - - if !previous_bucket_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(previous_bucket_ptr.into_owned()); - }) - }; - } - - unsafe { previous_bucket_ptr.as_ref() } + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return a copy of the previously associated value. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `V` must implement [`Clone`], as other threads may hold references to + /// the associated value. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_or_modify V>( + &self, + key: K, + value: V, + on_modify: F, + ) -> Option + where + V: Clone, + { + self.insert_with_or_modify_entry_and(key, move || value, on_modify, |_, v| v.clone()) } - fn do_remove( + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return a copy of the previous entry. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `K` and `V` must implement [`Clone`], as other threads may hold + /// references to the entry. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_or_modify_entry V>( &self, - key: &Q, - guard: &'g Guard, - ) -> Option<&'g Bucket> + key: K, + value: V, + on_modify: F, + ) -> Option<(K, V)> where - K: Borrow + Clone, + K: Clone, + V: Clone, { - let buckets_ptr = self.buckets.load_consume(guard); - - if buckets_ptr.is_null() { - return None; - } - - let buckets_ref = unsafe { buckets_ptr.deref() }; - let hash = self.get_hash(key); - - let (removed_ptr, new_buckets_ptr) = buckets_ref.remove(key, hash, None, guard); - - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); - } - - if !removed_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(removed_ptr.into_owned()); - }) - }; - - self.len.fetch_sub(1, Ordering::Relaxed); - } - - unsafe { removed_ptr.as_ref() } + self.insert_with_or_modify_entry_and( + key, + move || value, + on_modify, + |k, v| (k.clone(), v.clone()), + ) } - fn do_modify V>( + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return a copy of the previously + /// associated value. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked + /// and [`None`] will be returned. `on_modify` may be invoked multiple + /// times, even if [`None`] is returned. Similarly, `on_insert` may be + /// invoked if [`Some`] is returned. + /// + /// `V` must implement [`Clone`], as other threads may hold references to + /// the associated value. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_with_or_modify V, G: FnMut(&K, &V) -> V>( &self, - key: &Q, - modifier: F, - guard: &'g Guard, - ) -> Option<&'g Bucket> + key: K, + on_insert: F, + on_modify: G, + ) -> Option where - K: Borrow + Clone, + V: Clone, { - let buckets_ptr = self.buckets.load_consume(guard); - - if buckets_ptr.is_null() { - return None; - } - - let buckets = unsafe { buckets_ptr.deref() }; - let hash = self.get_hash(key); - - let (previous_bucket_ptr, new_buckets_ptr) = - buckets.modify(key, hash, modifier, None, guard); - - if !previous_bucket_ptr.is_null() { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(previous_bucket_ptr.into_owned()); - }) - }; - } - - if buckets_ptr != new_buckets_ptr { - self.swing_bucket_array_ptr(buckets_ptr, new_buckets_ptr, guard); - } - - unsafe { previous_bucket_ptr.as_ref() } + self.insert_with_or_modify_entry_and(key, on_insert, on_modify, |_, v| v.clone()) } - fn get_hash(&self, key: &Q) -> u64 { - let mut hasher = self.hash_builder.build_hasher(); - key.hash(&mut hasher); - - hasher.finish() + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return a copy of the previous + /// entry. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked + /// and [`None`] will be returned. `on_modify` may be invoked multiple + /// times, even if [`None`] is returned. Similarly, `on_insert` may be + /// invoked if [`Some`] is returned. + /// + /// `K` and `V` must implement [`Clone`], as other threads may hold + /// references to the entry. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn insert_with_or_modify_entry V, G: FnMut(&K, &V) -> V>( + &self, + key: K, + on_insert: F, + on_modify: G, + ) -> Option<(K, V)> + where + K: Clone, + V: Clone, + { + self.insert_with_or_modify_entry_and(key, on_insert, on_modify, |k, v| { + (k.clone(), v.clone()) + }) } - fn get_or_create_buckets(&self, guard: &'g Guard) -> Shared<'g, BucketArray> { - const DEFAULT_CAPACITY: usize = 64; - - let mut buckets_ptr = self.buckets.load_consume(guard); - let mut maybe_new_buckets = None; - - loop { - if buckets_ptr.is_null() { - let new_buckets = match maybe_new_buckets.take() { - Some(b) => b, - None => Owned::new(BucketArray::with_capacity_hasher_and_epoch( - DEFAULT_CAPACITY, - self.hash_builder.clone(), - 0, - )), - }; - - match self.buckets.compare_and_set_weak( - Shared::null(), - new_buckets, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(new_buckets) => return new_buckets, - Err(e) => { - maybe_new_buckets = Some(e.new); - buckets_ptr = self.buckets.load_consume(guard); - } - } - } else { - return buckets_ptr; - } - } - } - - fn swing_bucket_array_ptr( + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return the result of invoking `with_old_value` with + /// the previously associated value. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn insert_or_modify_and V, G: FnOnce(&V) -> T, T>( &self, - mut current_ptr: Shared<'g, BucketArray>, - new_ptr: Shared<'g, BucketArray>, - guard: &'g Guard, - ) { - assert!(!current_ptr.is_null()); - assert!(!new_ptr.is_null()); - - let minimum_epoch = unsafe { new_ptr.deref() }.epoch; - let mut current = unsafe { current_ptr.deref() }; - - while current.epoch < minimum_epoch { - let next_ptr = current.next_array.load_consume(guard); - assert!(!next_ptr.is_null()); - - match self.buckets.compare_and_set( - current_ptr, - next_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => { - unsafe { - guard.defer_unchecked(move || { - atomic::fence(Ordering::Acquire); - mem::drop(current_ptr.into_owned()); - }); - } - - current_ptr = next_ptr; - } - Err(_) => current_ptr = self.buckets.load_consume(guard), - } - - current = unsafe { current_ptr.deref() }; - } - } -} - -impl Drop for HashMap { - fn drop(&mut self) { - let guard = unsafe { crossbeam_epoch::unprotected() }; - - let mut buckets_ptr = self.buckets.load_consume(guard); - - while !buckets_ptr.is_null() { - let this_bucket_array = unsafe { buckets_ptr.deref() }; - let new_buckets_ptr = this_bucket_array.next_array.load_consume(guard); - - for this_bucket in this_bucket_array.buckets.iter() { - let this_bucket_ptr = this_bucket.load_consume(guard); - - if this_bucket_ptr.is_null() || this_bucket_ptr.tag().has_redirect() { - continue; - } - - mem::drop(unsafe { this_bucket_ptr.into_owned() }); - } - - mem::drop(unsafe { buckets_ptr.into_owned() }); - buckets_ptr = new_buckets_ptr; - } - } -} - -struct BucketArray { - buckets: Vec>>, // len() is a power of 2 - len: AtomicUsize, - next_array: Atomic>, - hash_builder: Arc, - epoch: u64, -} - -impl BucketArray { - fn with_capacity_hasher_and_epoch( - capacity: usize, - hash_builder: Arc, - epoch: u64, - ) -> BucketArray { - BucketArray { - buckets: vec![Atomic::null(); (capacity * 2).next_power_of_two()], - len: AtomicUsize::new(0), - next_array: Atomic::null(), - hash_builder, - epoch, - } - } - - fn get_hash(&self, key: &K) -> u64 { - let mut hasher = self.hash_builder.build_hasher(); - key.hash(&mut hasher); - - hasher.finish() - } -} - -// set on bucket pointers if this bucket has been moved into the next array -const REDIRECT_TAG: usize = 1; - -// this might seem complex -- you're right, but i wanted to add another tag -// it ended up not being useful, but the code was already written... -trait Tag { - fn has_redirect(self) -> bool; - fn with_redirect(self) -> Self; - fn without_redirect(self) -> Self; -} - -impl Tag for usize { - fn has_redirect(self) -> bool { - (self & REDIRECT_TAG) != 0 - } - - fn with_redirect(self) -> Self { - self | REDIRECT_TAG + key: K, + value: V, + on_modify: F, + with_old_value: G, + ) -> Option { + self.insert_with_or_modify_entry_and( + key, + move || value, + on_modify, + move |_, v| with_old_value(v), + ) } - fn without_redirect(self) -> Self { - self & !REDIRECT_TAG + /// Insert a value if none is associated with `key`. Otherwise, replace the + /// value with the result of `on_modify` with the current entry as + /// arguments. Finally, return the result of invoking `with_old_entry` with + /// the previous entry. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + pub fn insert_or_modify_entry_and V, G: FnOnce(&K, &V) -> T, T>( + &self, + key: K, + value: V, + on_modify: F, + with_old_entry: G, + ) -> Option { + self.insert_with_or_modify_entry_and(key, move || value, on_modify, with_old_entry) } -} -impl<'g, K: Hash + Eq, V, S: BuildHasher> BucketArray { - fn get( + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return the result of invoking + /// `with_old_value` with the previously associated value. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked, + /// `with_old_value` will not be invoked, and [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// Similarly, `on_insert` may be invoked if [`Some`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + pub fn insert_with_or_modify_and< + F: FnOnce() -> V, + G: FnMut(&K, &V) -> V, + H: FnOnce(&V) -> T, + T, + >( &self, - key: &Q, - hash: u64, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) - where - K: Borrow, - { - let self_ptr = (self as *const BucketArray).into(); - - let capacity = self.buckets.len(); - let offset = (hash & (self.buckets.len() - 1) as u64) as usize; - - for this_bucket_ptr in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - .map(|this_bucket| this_bucket.load_consume(guard)) - { - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.key.borrow() != key { - continue; - } else if !this_bucket_ptr.tag().has_redirect() { - return (this_bucket_ptr, self_ptr); - } - - // consume load from this_bucket isn't strong enough to publish - // writes to *self.next_array - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); - - return next_array.get(key, hash, guard); - } else { - return (Shared::null(), self_ptr); - } - } - - (Shared::null(), self_ptr) + key: K, + on_insert: F, + on_modify: G, + with_old_value: H, + ) -> Option { + self.insert_with_or_modify_entry_and(key, on_insert, on_modify, move |_, v| { + with_old_value(v) + }) } - fn insert( + /// Insert the result of `on_insert` if no value is associated with `key`. + /// Otherwise, replace the value with the result of `on_modify` with the + /// current entry as arguments. Finally, return the result of invoking + /// `with_old_entry` with the previous entry. + /// + /// If there is no value associated with `key`, `on_insert` will be invoked, + /// `with_old_value` will not be invoked, and [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// Similarly, `on_insert` may be invoked if [`Some`] is returned. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some + pub fn insert_with_or_modify_entry_and< + F: FnOnce() -> V, + G: FnMut(&K, &V) -> V, + H: FnOnce(&K, &V) -> T, + T, + >( &self, - bucket_ptr: Shared<'g, Bucket>, - hash: u64, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) { - assert!(!bucket_ptr.is_null()); - - let bucket = unsafe { bucket_ptr.deref() }; - let capacity = self.buckets.len(); - let len = self.len.load(Ordering::Relaxed); - - // grow if inserting would push us over a load factor of 0.5 - if (len + 1) > (capacity / 2) { - let next_array = self.grow(guard); - - return next_array.insert(bucket_ptr, hash, guard); - } - - let grow_into_and_insert_into_next = || { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - // self.grow_into(next_array, guard); - - next_array.insert(bucket_ptr, hash, guard) - }; - - let offset = (hash & (capacity - 1) as u64) as usize; - let mut have_seen_redirect = false; + key: K, + on_insert: F, + mut on_modify: G, + with_old_entry: H, + ) -> Option { + let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); + let mut state = InsertOrModifyState::New(key, on_insert); - for this_bucket in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - loop { - let this_bucket_ptr = this_bucket.load_consume(guard); + let result; - have_seen_redirect = have_seen_redirect || this_bucket_ptr.tag().has_redirect(); + loop { + while self.len.load(Ordering::Relaxed) > bucket_array_ref.capacity() { + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); + } - let should_increment_len = - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.key != bucket.key { - break; + match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 { + self.len.fetch_add(1, Ordering::Relaxed); + result = None; + } else { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + result = Some(with_old_entry(key, unsafe { &*value.as_ptr() })); } - this_bucket_ref.is_tombstone() + unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) }; } else { - true - }; - - if this_bucket_ptr.tag().has_redirect() { - return grow_into_and_insert_into_next(); - } - - if this_bucket - .compare_and_set_weak( - this_bucket_ptr, - bucket_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) - .is_ok() - { - if should_increment_len { - // replaced a tombstone self.len.fetch_add(1, Ordering::Relaxed); + result = None; } - return ( - this_bucket_ptr, - (self as *const BucketArray).into(), - ); + break; + } + Err((s, f)) => { + state = s; + on_modify = f; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); } } } - if have_seen_redirect { - grow_into_and_insert_into_next() - } else { - let next_array = self.grow(guard); + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - next_array.insert(bucket_ptr, hash, guard) - } + result } - fn remove( - &self, - key: &Q, - hash: u64, - mut maybe_new_bucket: Option>>, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return a copy + /// of the previously associated value. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `V` must implement [`Clone`], as other + /// threads may hold references to the associated value. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn modify V>(&self, key: K, on_modify: F) -> Option where - K: Clone + Borrow, + V: Clone, { - let self_shared = (self as *const BucketArray).into(); - - let capacity = self.buckets.len(); - let offset = (hash & (capacity - 1) as u64) as usize; - - for this_bucket in (0..self.buckets.len()) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.key.borrow() != key { - // hash collision - continue; - } - } - - loop { - if this_bucket_ptr.tag().has_redirect() { - let next_array = unsafe { self.get_next_unchecked(guard) }; - - return next_array.remove(key, hash, maybe_new_bucket, guard); - } - - let this_bucket_ref = - if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { - if this_bucket_ref.is_tombstone() { - return (Shared::null(), self_shared); - } - - this_bucket_ref - } else { - return (Shared::null(), self_shared); - }; - - let new_bucket = match maybe_new_bucket.take() { - Some(b) => b, - None => Owned::new(Bucket { - key: this_bucket_ref.key.clone(), - maybe_value: None, - }), - }; - - match this_bucket.compare_and_set_weak( - this_bucket_ptr, - new_bucket, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => { - self.len.fetch_sub(1, Ordering::Relaxed); - - return (this_bucket_ptr, self_shared); - } - Err(e) => { - maybe_new_bucket = Some(e.new); - let current_bucket_ptr = this_bucket.load_consume(guard); - - // check is only necessary once -- keys never change - // after being inserted/removed - if this_bucket_ptr.is_null() - && unsafe { current_bucket_ptr.as_ref() } - .map(|b| b.key.borrow() != key) - .unwrap_or(false) - { - break; - } - - this_bucket_ptr = current_bucket_ptr; - } - } - } - } - - (Shared::null(), self_shared) + self.modify_entry_and(key, on_modify, |_, v| v.clone()) } - fn modify V>( - &self, - key: &Q, - hash: u64, - mut modifier: F, - maybe_new_bucket_ptr: Option>>, - guard: &'g Guard, - ) -> (Shared<'g, Bucket>, Shared<'g, BucketArray>) + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return a copy + /// of the previously entry. + /// + /// If there is no value associated with `key`, [`None`] will be returned. + /// `on_modify` may be invoked multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. `K` and `V` must implement [`Clone`], as other + /// threads may hold references to the entry. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html + pub fn modify_entry V>(&self, key: K, on_modify: F) -> Option<(K, V)> where - K: Clone + Borrow, + K: Clone, + V: Clone, { - let self_shared: Shared<'g, BucketArray> = (self as *const Self).into(); - - let capacity = self.buckets.len(); - let offset = (hash & (self.buckets.len() - 1) as u64) as usize; - - for this_bucket in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - let mut this_bucket_ref = match unsafe { this_bucket_ptr.as_ref() } { - Some(b) => { - // buckets will never have their key changed, so we only have to - // make this check once - if b.key.borrow() != key { - continue; - } - - b - } - None => return (Shared::null(), self_shared), - }; - - let mut new_bucket_ptr = maybe_new_bucket_ptr.unwrap_or_else(|| { - Owned::new(Bucket { - key: this_bucket_ref.key.clone(), - maybe_value: None, - }) - }); - - loop { - if this_bucket_ptr.tag().has_redirect() { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); - - return next_array.modify(key, hash, modifier, Some(new_bucket_ptr), guard); - } - - let old_value = match this_bucket_ref.maybe_value.as_ref() { - Some(v) => v, - None => return (Shared::null(), self_shared), - }; - - new_bucket_ptr.maybe_value = Some(modifier(old_value)); - - // i assume that a strong CAS is less expensive than invoking - // modifier a second time - match this_bucket.compare_and_set( - this_bucket_ptr, - new_bucket_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => return (this_bucket_ptr, self_shared), - Err(e) => { - new_bucket_ptr = e.new; - - this_bucket_ptr = this_bucket.load_consume(guard); - assert!(!this_bucket_ptr.is_null()); - - this_bucket_ref = unsafe { this_bucket_ptr.deref() }; - } - } - } - } - - (Shared::null(), self_shared) + self.modify_entry_and(key, on_modify, |k, v| (k.clone(), v.clone())) } - fn insert_or_modify V, F: FnMut(&V) -> V>( + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return the + /// result of invoking `with_old_value` with the previously associated + /// value. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + pub fn modify_and V, G: FnOnce(&V) -> T, T>( &self, - mut key_or_bucket: KeyOrBucket, - hash: u64, - mut inserter: FunctionOrValue, - mut modifier: F, - guard: &'g Guard, - ) -> BucketAndParent<'g, K, V, S> { - let self_shared = (self as *const BucketArray).into(); - - let capacity = self.buckets.len(); - let len = self.len.load(Ordering::Relaxed); - - let insert_into_next = |key_or_bucket, inserter, modifier| { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); - - next_array.insert_or_modify(key_or_bucket, hash, inserter, modifier, guard) - }; - - // grow if inserting would push us over a load factor of 0.5 - if (len + 1) > (capacity / 2) { - let next_array = self.grow(guard); - - return next_array.insert_or_modify(key_or_bucket, hash, inserter, modifier, guard); - } - - let offset = (hash & (capacity - 1) as u64) as usize; - let mut have_seen_redirect = false; - - for this_bucket in (0..capacity) - .map(|x| (x + offset) & (capacity - 1)) - .map(|i| &self.buckets[i]) - { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - have_seen_redirect = have_seen_redirect || this_bucket_ptr.tag().has_redirect(); - - // if the bucket pointer is non-null and its key does not match, move to the next bucket - if !this_bucket_ptr.tag().has_redirect() - && unsafe { this_bucket_ptr.as_ref() } - .map(|this_bucket_ref| &this_bucket_ref.key != key_or_bucket.as_key()) - .unwrap_or(false) - { - continue; - } + key: K, + on_modify: F, + with_old_value: G, + ) -> Option { + self.modify_entry_and(key, on_modify, move |_, v| with_old_value(v)) + } - loop { - if this_bucket_ptr.tag().has_redirect() { - return insert_into_next(key_or_bucket, inserter, modifier); - } + /// If there is a value associated with `key`, replace it with the result of + /// invoking `on_modify` using the current key and value, then return the + /// result of invoking `with_old_value` with the previous entry. + /// + /// If there is no value associated with `key`, `with_old_value` will not be + /// invoked and [`None`] will be returned. `on_modify` may be invoked + /// multiple times, even if [`None`] is returned. + /// + /// `Q` can be any borrowed form of `K`, but [`Hash`] and [`Eq`] on `Q` + /// *must* match that of `K`. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`Hash`]: https://doc.rust-lang.org/std/hash/trait.Hash.html + /// [`Eq`]: https://doc.rust-lang.org/std/cmp/trait.Eq.html + pub fn modify_entry_and V, G: FnOnce(&K, &V) -> T, T>( + &self, + key: K, + mut on_modify: F, + with_old_entry: G, + ) -> Option { + let guard = &crossbeam_epoch::pin(); + let current_ref = self.bucket_array(guard); + let mut bucket_array_ref = current_ref; + let hash = bucket::hash(&self.build_hasher, &key); + let mut key_or_owned_bucket = KeyOrOwnedBucket::Key(key); - let new_value = unsafe { this_bucket_ptr.as_ref() } - .and_then(|this_bucket_ref| this_bucket_ref.maybe_value.as_ref()) - .map(&mut modifier) - .unwrap_or_else(|| inserter.into_value()); - let new_bucket_ptr = key_or_bucket.into_bucket_with_value(new_value); - - // i assume it is more expensive to invoke modifier than it is - // to strong CAS - match this_bucket.compare_and_set( - this_bucket_ptr, - new_bucket_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => { - if this_bucket_ptr.is_null() { - self.len.fetch_add(1, Ordering::Relaxed); - } + let result; - return BucketAndParent { - bucket: this_bucket_ptr, - parent: self_shared, - }; + loop { + match bucket_array_ref.modify(guard, hash, key_or_owned_bucket, on_modify) { + Ok(previous_bucket_ptr) => { + if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } { + let Bucket { + key, + maybe_value: value, + } = previous_bucket_ref; + result = Some(with_old_entry(key, unsafe { &*value.as_ptr() })); + + unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) }; + } else { + result = None; } - Err(mut e) => { - inserter = FunctionOrValue::Value(e.new.maybe_value.take().unwrap()); - key_or_bucket = KeyOrBucket::Bucket(e.new); - - have_seen_redirect = have_seen_redirect || e.current.tag().has_redirect(); - - // if another thread inserted into this bucket, check to see if its key - // matches the one we are trying to insert/modify - if this_bucket_ptr.is_null() - && !e.current.tag().has_redirect() - && unsafe { e.current.as_ref() } - .map(|this_bucket_ref| { - &this_bucket_ref.key != key_or_bucket.as_key() - }) - .unwrap_or(false) - { - continue; - } - this_bucket_ptr = this_bucket.load_consume(guard); - } + break; + } + Err((kb, f)) => { + key_or_owned_bucket = kb; + on_modify = f; + bucket_array_ref = bucket_array_ref.rehash(guard, &self.build_hasher); } } } - let next_array = if have_seen_redirect { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - - let next_array = unsafe { next_array_ptr.deref() }; - self.grow_into(next_array, guard); + self.swing_bucket_array(guard, current_ref, bucket_array_ref); - next_array - } else { - self.grow(guard) - }; - - next_array.insert_or_modify(key_or_bucket, hash, inserter, modifier, guard) + result } +} - fn grow(&self, guard: &'g Guard) -> &'g BucketArray { - let maybe_next_array_ptr = self.next_array.load_consume(guard); - - if let Some(next_array) = unsafe { maybe_next_array_ptr.as_ref() } { - self.grow_into(next_array, guard); +impl HashMap { + fn bucket_array<'g>(&self, guard: &'g Guard) -> &'g BucketArray { + const DEFAULT_CAPACITY: usize = 64; - return next_array; - } + let mut maybe_new_bucket_array = None; - let allocated_array_ptr = Owned::new(BucketArray::with_capacity_hasher_and_epoch( - self.buckets.len() * 2, - self.hash_builder.clone(), - self.epoch + 1, - )); - - let next_array_ptr = match self.next_array.compare_and_set( - Shared::null(), - allocated_array_ptr, - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(next_array_ptr) => next_array_ptr, - Err(_) => self.next_array.load_consume(guard), - }; + loop { + let bucket_array_ptr = self.bucket_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - let next_array = unsafe { next_array_ptr.deref() }; + if let Some(bucket_array_ref) = unsafe { bucket_array_ptr.as_ref() } { + return bucket_array_ref; + } - self.grow_into(next_array, guard); + let new_bucket_array = maybe_new_bucket_array + .unwrap_or_else(|| Owned::new(BucketArray::with_capacity(0, DEFAULT_CAPACITY))); - next_array + match self.bucket_array.compare_and_set_weak( + Shared::null(), + new_bucket_array, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(b) => return unsafe { b.as_ref() }.unwrap(), + Err(CompareAndSetError { new, .. }) => maybe_new_bucket_array = Some(new), + } + } } - fn grow_into(&self, next_array: &'g BucketArray, guard: &'g Guard) { - for this_bucket in self.buckets.iter() { - let mut this_bucket_ptr = this_bucket.load_consume(guard); - - // if we insert a bucket that is then tombstone-d, we need to - // insert into the new bucket arrays - let mut maybe_hash = None; - - loop { - if this_bucket_ptr.tag().has_redirect() { - break; - } - - // if we already inserted this bucket, or if this bucket is - // non-null and not a tombstone - if maybe_hash.is_some() - || !unsafe { this_bucket_ptr.as_ref() } - .map(Bucket::is_tombstone) - .unwrap_or(true) - { - assert!(!this_bucket_ptr.is_null()); + fn swing_bucket_array<'g>( + &self, + guard: &'g Guard, + mut current_ref: &'g BucketArray, + min_ref: &'g BucketArray, + ) { + let min_epoch = min_ref.epoch; - let hash = maybe_hash.unwrap_or_else(|| { - let key = unsafe { &this_bucket_ptr.deref().key }; + let mut current_ptr = (current_ref as *const BucketArray).into(); + let min_ptr: Shared<'g, _> = (min_ref as *const BucketArray).into(); - self.get_hash(key) - }); + loop { + if current_ref.epoch >= min_epoch { + return; + } - next_array.insert(this_bucket_ptr, hash, guard); - maybe_hash = Some(hash); - } + match self.bucket_array.compare_and_set_weak( + current_ptr, + min_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) }, + Err(_) => { + let new_ptr = self.bucket_array.load_consume(guard); + assert!(!new_ptr.is_null()); - // strong CAS to avoid spurious duplicate re-insertions - match this_bucket.compare_and_set( - this_bucket_ptr, - this_bucket_ptr.with_tag(this_bucket_ptr.tag().with_redirect()), - (Ordering::Release, Ordering::Relaxed), - guard, - ) { - Ok(_) => break, - Err(_) => this_bucket_ptr = this_bucket.load_consume(guard), + current_ptr = new_ptr; + current_ref = unsafe { new_ptr.as_ref() }.unwrap(); } } } } - - unsafe fn get_next_unchecked(&self, guard: &'g Guard) -> &'g BucketArray { - let next_array_ptr = self.next_array.load_consume(guard); - assert!(!next_array_ptr.is_null()); - - let next_array = next_array_ptr.deref(); - self.grow_into(next_array, guard); - - next_array - } } -struct BucketAndParent<'a, K: Hash + Eq, V, S: BuildHasher> { - bucket: Shared<'a, Bucket>, - parent: Shared<'a, BucketArray>, -} - -#[repr(align(2))] -struct Bucket { - key: K, - maybe_value: Option, -} - -enum FunctionOrValue T, T> { - Function(F), - Value(T), -} - -impl T, T> FunctionOrValue { - fn into_value(self) -> T { - match self { - FunctionOrValue::Function(f) => f(), - FunctionOrValue::Value(v) => v, - } - } -} +impl Drop for HashMap { + fn drop(&mut self) { + let guard = unsafe { &crossbeam_epoch::unprotected() }; + atomic::fence(Ordering::Acquire); -impl Bucket { - fn is_tombstone(&self) -> bool { - self.maybe_value.is_none() - } -} + let mut current_ptr = self.bucket_array.load(Ordering::Relaxed, guard); -enum KeyOrBucket { - Key(K), - Bucket(Owned>), -} + while let Some(current_ref) = unsafe { current_ptr.as_ref() } { + let next_ptr = current_ref.next.load(Ordering::Relaxed, guard); -impl KeyOrBucket { - fn into_bucket_with_value(self, value: V) -> Owned> { - match self { - KeyOrBucket::Key(key) => Owned::new(Bucket { - key, - maybe_value: Some(value), - }), - KeyOrBucket::Bucket(mut bucket_ptr) => { - bucket_ptr.maybe_value = Some(value); - - bucket_ptr + for this_bucket_ptr in current_ref + .buckets + .iter() + .map(|b| b.load(Ordering::Relaxed, guard)) + .filter(|p| !p.is_null()) + .filter(|p| next_ptr.is_null() || p.tag() & bucket::TOMBSTONE_TAG == 0) + { + // only delete tombstones from the newest bucket array + // the only way this becomes a memory leak is if there was a panic during a rehash, + // in which case i'm going to say that running destructors and freeing memory is + // best-effort, and my best effort is to not do it + unsafe { bucket::defer_acquire_destroy(guard, this_bucket_ptr) }; } - } - } - fn as_key(&self) -> &K { - match self { - KeyOrBucket::Key(key) => &key, - KeyOrBucket::Bucket(bucket) => &bucket.key, + unsafe { bucket::defer_acquire_destroy(guard, current_ptr) }; + + current_ptr = next_ptr; } } } diff --git a/src/map/bucket.rs b/src/map/bucket.rs new file mode 100644 index 0000000..8d09d53 --- /dev/null +++ b/src/map/bucket.rs @@ -0,0 +1,798 @@ +// MIT License +// +// Copyright (c) 2020 Gregory Meyer +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation files +// (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of the Software, +// and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +use std::{ + borrow::Borrow, + hash::{BuildHasher, Hash, Hasher}, + mem::{self, MaybeUninit}, + ptr, + sync::atomic::{self, Ordering}, +}; + +use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared}; + +pub(crate) struct BucketArray { + pub(crate) buckets: Box<[Atomic>]>, + pub(crate) next: Atomic>, + pub(crate) epoch: usize, +} + +impl BucketArray { + pub(crate) fn with_capacity(epoch: usize, capacity: usize) -> Self { + let real_capacity = (capacity * 2).next_power_of_two(); + let mut buckets = Vec::with_capacity(real_capacity); + + for _ in 0..real_capacity { + buckets.push(Atomic::null()); + } + + let buckets = buckets.into_boxed_slice(); + + Self { + buckets, + next: Atomic::null(), + epoch, + } + } + + pub(crate) fn capacity(&self) -> usize { + assert!(self.buckets.len().is_power_of_two()); + + self.buckets.len() / 2 + } +} + +impl<'g, K: 'g + Eq, V: 'g> BucketArray { + pub(crate) fn get( + &self, + guard: &'g Guard, + hash: u64, + key: &Q, + ) -> Result>, RelocatedError> + where + K: Borrow, + { + let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| { + let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } + { + this_bucket_ref + } else { + return ProbeLoopAction::Return(Shared::null()); + }; + + let this_key = &this_bucket_ref.key; + + if this_key.borrow() != key { + return ProbeLoopAction::Continue; + } + + let result_ptr = if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 { + this_bucket_ptr + } else { + Shared::null() + }; + + ProbeLoopAction::Return(result_ptr) + }); + + match loop_result { + ProbeLoopResult::Returned(t) => Ok(t), + ProbeLoopResult::LoopEnded => Ok(Shared::null()), + ProbeLoopResult::FoundSentinelTag => Err(RelocatedError), + } + } + + pub(crate) fn insert( + &self, + guard: &'g Guard, + hash: u64, + bucket_ptr: Owned>, + ) -> Result>, Owned>> { + let mut maybe_bucket_ptr = Some(bucket_ptr); + + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let bucket_ptr = maybe_bucket_ptr.take().unwrap(); + let key = &bucket_ptr.key; + + if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } { + if this_key != key { + maybe_bucket_ptr = Some(bucket_ptr); + + return ProbeLoopAction::Continue; + } + } + + match this_bucket.compare_and_set_weak( + this_bucket_ptr, + bucket_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(_) => ProbeLoopAction::Return(this_bucket_ptr), + Err(CompareAndSetError { new, .. }) => { + maybe_bucket_ptr = Some(new); + + ProbeLoopAction::Reload + } + } + }); + + loop_result + .returned() + .ok_or_else(|| maybe_bucket_ptr.unwrap()) + } + + pub(crate) fn remove_if bool>( + &self, + guard: &'g Guard, + hash: u64, + key: &Q, + mut condition: F, + ) -> Result>, F> + where + K: Borrow, + { + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } + { + this_bucket_ref + } else { + return ProbeLoopAction::Return(Shared::null()); + }; + + let this_key = &this_bucket_ref.key; + + if this_key.borrow() != key { + return ProbeLoopAction::Continue; + } else if this_bucket_ptr.tag() & TOMBSTONE_TAG != 0 { + return ProbeLoopAction::Return(Shared::null()); + } + + let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() }; + + if !condition(this_key, this_value) { + return ProbeLoopAction::Return(Shared::null()); + } + + let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG); + + match this_bucket.compare_and_set_weak( + this_bucket_ptr, + new_bucket_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(_) => ProbeLoopAction::Return(new_bucket_ptr), + Err(_) => ProbeLoopAction::Reload, + } + }); + + match loop_result { + ProbeLoopResult::Returned(t) => Ok(t), + ProbeLoopResult::LoopEnded => Ok(Shared::null()), + ProbeLoopResult::FoundSentinelTag => Err(condition), + } + } + + pub(crate) fn modify V>( + &self, + guard: &'g Guard, + hash: u64, + key_or_owned_bucket: KeyOrOwnedBucket, + mut modifier: F, + ) -> Result>, (KeyOrOwnedBucket, F)> { + let mut maybe_key_or_owned_bucket = Some(key_or_owned_bucket); + + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let key_or_owned_bucket = maybe_key_or_owned_bucket.take().unwrap(); + + let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } + { + this_bucket_ref + } else { + maybe_key_or_owned_bucket = Some(key_or_owned_bucket); + + return ProbeLoopAction::Return(Shared::null()); + }; + + let this_key = &this_bucket_ref.key; + let key = key_or_owned_bucket.key(); + + if key != this_key { + maybe_key_or_owned_bucket = Some(key_or_owned_bucket); + + return ProbeLoopAction::Continue; + } + + if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 { + let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() }; + let new_value = modifier(this_key, this_value); + let new_bucket = key_or_owned_bucket.into_bucket(new_value); + + if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak( + this_bucket_ptr, + new_bucket, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + maybe_key_or_owned_bucket = Some(KeyOrOwnedBucket::OwnedBucket(new)); + + ProbeLoopAction::Reload + } else { + ProbeLoopAction::Return(this_bucket_ptr) + } + } else { + ProbeLoopAction::Return(Shared::null()) + } + }); + + loop_result + .returned() + .ok_or_else(|| (maybe_key_or_owned_bucket.unwrap(), modifier)) + } + + pub(crate) fn insert_or_modify V, G: FnMut(&K, &V) -> V>( + &self, + guard: &'g Guard, + hash: u64, + state: InsertOrModifyState, + mut modifier: G, + ) -> Result>, (InsertOrModifyState, G)> { + let mut maybe_state = Some(state); + + let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| { + let state = maybe_state.take().unwrap(); + + let (new_bucket, maybe_insert_value) = + if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { + let this_key = &this_bucket_ref.key; + + if this_key != state.key() { + maybe_state = Some(state); + + return ProbeLoopAction::Continue; + } + + if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 { + let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() }; + let new_value = modifier(this_key, this_value); + + let (new_bucket, insert_value) = state.into_modify_bucket(new_value); + + (new_bucket, Some(insert_value)) + } else { + (state.into_insert_bucket(), None) + } + } else { + (state.into_insert_bucket(), None) + }; + + if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak( + this_bucket_ptr, + new_bucket, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + maybe_state = Some(InsertOrModifyState::from_bucket_value( + new, + maybe_insert_value, + )); + + ProbeLoopAction::Reload + } else { + ProbeLoopAction::Return(this_bucket_ptr) + } + }); + + loop_result + .returned() + .ok_or_else(|| (maybe_state.unwrap(), modifier)) + } + + fn insert_for_grow( + &self, + guard: &'g Guard, + hash: u64, + bucket_ptr: Shared<'g, Bucket>, + ) -> Option { + assert!(!bucket_ptr.is_null()); + assert_eq!(bucket_ptr.tag() & SENTINEL_TAG, 0); + assert_ne!(bucket_ptr.tag() & BORROWED_TAG, 0); + + let key = &unsafe { bucket_ptr.deref() }.key; + + let loop_result = self.probe_loop(guard, hash, |i, this_bucket, this_bucket_ptr| { + if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } { + if this_bucket_ptr == bucket_ptr { + return ProbeLoopAction::Return(None); + } else if this_key != key { + return ProbeLoopAction::Continue; + } else if this_bucket_ptr.tag() & BORROWED_TAG == 0 { + return ProbeLoopAction::Return(None); + } + } + + if this_bucket_ptr.is_null() && bucket_ptr.tag() & TOMBSTONE_TAG != 0 { + ProbeLoopAction::Return(None) + } else if this_bucket + .compare_and_set_weak( + this_bucket_ptr, + bucket_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) + .is_ok() + { + ProbeLoopAction::Return(Some(i)) + } else { + ProbeLoopAction::Reload + } + }); + + loop_result.returned().flatten() + } +} + +impl<'g, K: 'g, V: 'g> BucketArray { + fn probe_loop< + F: FnMut(usize, &Atomic>, Shared<'g, Bucket>) -> ProbeLoopAction, + T, + >( + &self, + guard: &'g Guard, + hash: u64, + mut f: F, + ) -> ProbeLoopResult { + let offset = hash as usize & (self.buckets.len() - 1); + + for i in + (0..self.buckets.len()).map(|i| (i.wrapping_add(offset)) & (self.buckets.len() - 1)) + { + let this_bucket = &self.buckets[i]; + + loop { + let this_bucket_ptr = this_bucket.load_consume(guard); + + if this_bucket_ptr.tag() & SENTINEL_TAG != 0 { + return ProbeLoopResult::FoundSentinelTag; + } + + match f(i, this_bucket, this_bucket_ptr) { + ProbeLoopAction::Continue => break, + ProbeLoopAction::Reload => (), + ProbeLoopAction::Return(t) => return ProbeLoopResult::Returned(t), + } + } + } + + ProbeLoopResult::LoopEnded + } + + pub(crate) fn rehash( + &self, + guard: &'g Guard, + build_hasher: &H, + ) -> &'g BucketArray + where + K: Hash + Eq, + { + let next_array = self.next_array(guard); + assert!(self.buckets.len() <= next_array.buckets.len()); + + for this_bucket in self.buckets.iter() { + let mut maybe_state: Option<(usize, Shared<'g, Bucket>)> = None; + + loop { + let this_bucket_ptr = this_bucket.load_consume(guard); + + if this_bucket_ptr.tag() & SENTINEL_TAG != 0 { + break; + } + + let to_put_ptr = this_bucket_ptr.with_tag(this_bucket_ptr.tag() | BORROWED_TAG); + + if let Some((index, mut next_bucket_ptr)) = maybe_state { + assert!(!this_bucket_ptr.is_null()); + + let next_bucket = &next_array.buckets[index]; + + while next_bucket_ptr.tag() & BORROWED_TAG != 0 + && next_bucket + .compare_and_set_weak( + next_bucket_ptr, + to_put_ptr, + (Ordering::Release, Ordering::Relaxed), + guard, + ) + .is_err() + { + next_bucket_ptr = next_bucket.load_consume(guard); + } + } else if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } { + let key = &this_bucket_ref.key; + let hash = hash(build_hasher, key); + + if let Some(index) = next_array.insert_for_grow(guard, hash, to_put_ptr) { + maybe_state = Some((index, to_put_ptr)); + } + } + + if this_bucket + .compare_and_set_weak( + this_bucket_ptr, + Shared::null().with_tag(SENTINEL_TAG), + (Ordering::Release, Ordering::Relaxed), + guard, + ) + .is_ok() + { + if !this_bucket_ptr.is_null() + && this_bucket_ptr.tag() & TOMBSTONE_TAG != 0 + && maybe_state.is_none() + { + unsafe { defer_destroy_bucket(guard, this_bucket_ptr) }; + } + + break; + } + } + } + + next_array + } + + fn next_array(&self, guard: &'g Guard) -> &'g BucketArray { + let mut maybe_new_next = None; + + loop { + let next_ptr = self.next.load_consume(guard); + + if let Some(next_ref) = unsafe { next_ptr.as_ref() } { + return next_ref; + } + + let new_next = maybe_new_next.unwrap_or_else(|| { + Owned::new(BucketArray::with_capacity( + self.epoch + 1, + self.buckets.len(), + )) + }); + + match self.next.compare_and_set_weak( + Shared::null(), + new_next, + (Ordering::Release, Ordering::Relaxed), + guard, + ) { + Ok(p) => return unsafe { p.deref() }, + Err(CompareAndSetError { new, .. }) => { + maybe_new_next = Some(new); + } + } + } + } +} + +#[repr(align(8))] +#[derive(Debug)] +pub(crate) struct Bucket { + pub(crate) key: K, + pub(crate) maybe_value: MaybeUninit, +} + +impl Bucket { + pub(crate) fn new(key: K, value: V) -> Bucket { + Bucket { + key, + maybe_value: MaybeUninit::new(value), + } + } +} + +#[derive(Debug, Eq, PartialEq)] +pub(crate) struct RelocatedError; + +pub(crate) enum KeyOrOwnedBucket { + Key(K), + OwnedBucket(Owned>), +} + +impl KeyOrOwnedBucket { + fn key(&self) -> &K { + match self { + Self::Key(k) => k, + Self::OwnedBucket(b) => &b.key, + } + } + + fn into_bucket(self, value: V) -> Owned> { + match self { + Self::Key(k) => Owned::new(Bucket::new(k, value)), + Self::OwnedBucket(mut b) => { + unsafe { + mem::drop( + mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init(), + ) + }; + + b + } + } + } +} + +pub(crate) enum InsertOrModifyState V> { + New(K, F), + AttemptedInsertion(Owned>), + AttemptedModification(Owned>, ValueOrFunction), +} + +impl V> InsertOrModifyState { + fn from_bucket_value( + bucket: Owned>, + value_or_function: Option>, + ) -> Self { + if let Some(value_or_function) = value_or_function { + Self::AttemptedModification(bucket, value_or_function) + } else { + Self::AttemptedInsertion(bucket) + } + } + + fn key(&self) -> &K { + match self { + InsertOrModifyState::New(k, _) => &k, + InsertOrModifyState::AttemptedInsertion(b) + | InsertOrModifyState::AttemptedModification(b, _) => &b.key, + } + } + + fn into_insert_bucket(self) -> Owned> { + match self { + InsertOrModifyState::New(k, f) => Owned::new(Bucket::new(k, f())), + InsertOrModifyState::AttemptedInsertion(b) => b, + InsertOrModifyState::AttemptedModification(mut b, v_or_f) => { + unsafe { + mem::drop( + mem::replace(&mut b.maybe_value, MaybeUninit::new(v_or_f.into_value())) + .assume_init(), + ) + }; + + b + } + } + } + + fn into_modify_bucket(self, value: V) -> (Owned>, ValueOrFunction) { + match self { + InsertOrModifyState::New(k, f) => ( + Owned::new(Bucket::new(k, value)), + ValueOrFunction::Function(f), + ), + InsertOrModifyState::AttemptedInsertion(mut b) => { + let insert_value = unsafe { + mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init() + }; + + (b, ValueOrFunction::Value(insert_value)) + } + InsertOrModifyState::AttemptedModification(mut b, v_or_f) => { + unsafe { + mem::drop( + mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init(), + ) + }; + + (b, v_or_f) + } + } + } +} + +pub(crate) enum ValueOrFunction V> { + Value(V), + Function(F), +} + +impl V> ValueOrFunction { + fn into_value(self) -> V { + match self { + ValueOrFunction::Value(v) => v, + ValueOrFunction::Function(f) => f(), + } + } +} + +pub(crate) fn hash(build_hasher: &H, key: &K) -> u64 { + let mut hasher = build_hasher.build_hasher(); + key.hash(&mut hasher); + + hasher.finish() +} + +enum ProbeLoopAction { + Continue, + Reload, + Return(T), +} + +enum ProbeLoopResult { + LoopEnded, + FoundSentinelTag, + Returned(T), +} + +impl ProbeLoopResult { + fn returned(self) -> Option { + match self { + Self::Returned(t) => Some(t), + Self::LoopEnded | Self::FoundSentinelTag => None, + } + } +} + +pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>( + guard: &'g Guard, + mut ptr: Shared<'g, Bucket>, +) { + assert!(!ptr.is_null()); + + guard.defer_unchecked(move || { + atomic::fence(Ordering::Acquire); + + if ptr.tag() & TOMBSTONE_TAG == 0 { + ptr::drop_in_place(ptr.deref_mut().maybe_value.as_mut_ptr()); + } + + mem::drop(ptr.into_owned()); + }); +} + +pub(crate) unsafe fn defer_destroy_tombstone<'g, K, V>( + guard: &'g Guard, + mut ptr: Shared<'g, Bucket>, +) { + assert!(!ptr.is_null()); + assert_ne!(ptr.tag() & TOMBSTONE_TAG, 0); + + atomic::fence(Ordering::Acquire); + // read the value now, but defer its destruction for later + let value = ptr::read(ptr.deref_mut().maybe_value.as_ptr()); + + // to be entirely honest, i don't know what order deferred functions are + // called in crossbeam-epoch. in the case that the deferred functions are + // called out of order, this prevents that from being an issue. + guard.defer_unchecked(move || mem::drop(value)); +} + +pub(crate) unsafe fn defer_acquire_destroy<'g, T>(guard: &'g Guard, ptr: Shared<'g, T>) { + assert!(!ptr.is_null()); + + guard.defer_unchecked(move || { + atomic::fence(Ordering::Acquire); + mem::drop(ptr.into_owned()); + }); +} + +pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when copied into a new table +pub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyed +pub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table + +#[cfg(test)] +mod tests { + use super::*; + + use ahash::RandomState; + + #[test] + fn get_insert_remove() { + let build_hasher = RandomState::new(); + let buckets = BucketArray::with_capacity(0, 8); + let guard = unsafe { &crossbeam_epoch::unprotected() }; + + let k1 = "foo"; + let h1 = hash(&build_hasher, k1); + let v1 = 5; + + let k2 = "bar"; + let h2 = hash(&build_hasher, k2); + let v2 = 10; + + let k3 = "baz"; + let h3 = hash(&build_hasher, k3); + let v3 = 15; + + assert_eq!(buckets.get(guard, h1, k1), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h2, k2), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + let b1 = Owned::new(Bucket::new(k1, v1)).into_shared(guard); + assert!(is_ok_null( + buckets.insert(guard, h1, unsafe { b1.into_owned() }) + )); + + assert_eq!(buckets.get(guard, h1, k1), Ok(b1)); + assert_eq!(buckets.get(guard, h2, k2), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + let b2 = Owned::new(Bucket::new(k2, v2)).into_shared(guard); + assert!(is_ok_null( + buckets.insert(guard, h2, unsafe { b2.into_owned() }) + )); + + assert_eq!(buckets.get(guard, h1, k1), Ok(b1)); + assert_eq!(buckets.get(guard, h2, k2), Ok(b2)); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + let b3 = Owned::new(Bucket::new(k3, v3)).into_shared(guard); + assert!(is_ok_null( + buckets.insert(guard, h3, unsafe { b3.into_owned() }) + )); + + assert_eq!(buckets.get(guard, h1, k1), Ok(b1)); + assert_eq!(buckets.get(guard, h2, k2), Ok(b2)); + assert_eq!(buckets.get(guard, h3, k3), Ok(b3)); + + assert_eq!( + buckets.remove_if(guard, h1, k1, |_, _| true).ok().unwrap(), + b1.with_tag(TOMBSTONE_TAG) + ); + unsafe { defer_destroy_tombstone(guard, b1.with_tag(TOMBSTONE_TAG)) }; + assert_eq!( + buckets.remove_if(guard, h2, k2, |_, _| true).ok().unwrap(), + b2.with_tag(TOMBSTONE_TAG) + ); + unsafe { defer_destroy_tombstone(guard, b2.with_tag(TOMBSTONE_TAG)) }; + assert_eq!( + buckets.remove_if(guard, h3, k3, |_, _| true).ok().unwrap(), + b3.with_tag(TOMBSTONE_TAG) + ); + unsafe { defer_destroy_tombstone(guard, b3.with_tag(TOMBSTONE_TAG)) }; + + assert_eq!(buckets.get(guard, h1, k1), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h2, k2), Ok(Shared::null())); + assert_eq!(buckets.get(guard, h3, k3), Ok(Shared::null())); + + for this_bucket in buckets.buckets.iter() { + let this_bucket_ptr = this_bucket.swap(Shared::null(), Ordering::Relaxed, guard); + + if this_bucket_ptr.is_null() { + continue; + } + + unsafe { + defer_destroy_bucket(guard, this_bucket_ptr); + } + } + } + + fn is_ok_null<'g, K, V, E>(maybe_bucket_ptr: Result>, E>) -> bool { + if let Ok(bucket_ptr) = maybe_bucket_ptr { + bucket_ptr.is_null() + } else { + false + } + } +} diff --git a/src/map/tests.rs b/src/map/tests.rs new file mode 100644 index 0000000..bec0ae9 --- /dev/null +++ b/src/map/tests.rs @@ -0,0 +1,903 @@ +// MIT License +// +// Copyright (c) 2020 Gregory Meyer +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation files +// (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of the Software, +// and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +mod util; + +use util::{DropNotifier, NoisyDropper}; + +use super::*; + +use std::{ + iter, + sync::{Arc, Barrier}, + thread::{self, JoinHandle}, +}; + +#[test] +fn insertion() { + const MAX_VALUE: i32 = 512; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, i), None); + + assert!(!map.is_empty()); + assert_eq!(map.len(), (i + 1) as usize); + + for j in 0..=i { + assert_eq!(map.get(&j), Some(j)); + assert_eq!(map.insert(j, j), Some(j)); + } + + for k in i + 1..MAX_VALUE { + assert_eq!(map.get(&k), None); + } + } +} + +#[test] +fn growth() { + const MAX_VALUE: i32 = 512; + + let map = HashMap::new(); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, i), None); + + assert!(!map.is_empty()); + assert_eq!(map.len(), (i + 1) as usize); + + for j in 0..=i { + assert_eq!(map.get(&j), Some(j)); + assert_eq!(map.insert(j, j), Some(j)); + } + + for k in i + 1..MAX_VALUE { + assert_eq!(map.get(&k), None); + } + } +} + +#[test] +fn concurrent_insertion() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = Arc::new(HashMap::with_capacity(MAX_INSERTED_VALUE as usize)); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn concurrent_growth() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = Arc::new(HashMap::new()); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(|t| t.join()) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn removal() { + const MAX_VALUE: i32 = 512; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, i), None); + } + + for i in 0..MAX_VALUE { + assert_eq!(map.remove(&i), Some(i)); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn concurrent_removal() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.insert(i, i), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.remove(&j), Some(j)); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(|t| t.join()) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), 0); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn concurrent_insertion_and_removal() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; + const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; + + let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.insert(i, i), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); + + let insert_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + let remove_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.remove(&j), Some(j)); + } + }) + }) + .collect(); + + for result in insert_threads + .into_iter() + .chain(remove_threads.into_iter()) + .map(|t| t.join()) + { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), INSERTED_MIDPOINT as usize); + + for i in 0..INSERTED_MIDPOINT { + assert_eq!(map.get(&i), Some(i)); + } + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn concurrent_growth_and_removal() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2; + const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2; + + let map = HashMap::with_capacity(INSERTED_MIDPOINT as usize); + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.insert(i, i), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS * 2)); + + let insert_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.insert(j, j), None); + } + }) + }) + .collect(); + + let remove_threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE)) { + assert_eq!(map.remove(&j), Some(j)); + } + }) + }) + .collect(); + + for result in insert_threads + .into_iter() + .chain(remove_threads.into_iter()) + .map(JoinHandle::join) + { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), INSERTED_MIDPOINT as usize); + + for i in 0..INSERTED_MIDPOINT { + assert_eq!(map.get(&i), Some(i)); + } + + for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn modify() { + let map = HashMap::new(); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + assert_eq!(map.modify("foo", |_, x| x * 2), None); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + map.insert("foo", 1); + assert_eq!(map.modify("foo", |_, x| x * 2), Some(1)); + + assert!(!map.is_empty()); + assert_eq!(map.len(), 1); + + map.remove("foo"); + assert_eq!(map.modify("foo", |_, x| x * 2), None); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); +} + +#[test] +fn concurrent_modification() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE; + + let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + map.insert(i, i); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in (i as i32 * MAX_VALUE)..((i as i32 + 1) * MAX_VALUE) { + assert_eq!(map.modify(j, |_, x| x * 2), Some(j)); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_INSERTED_VALUE as usize); + + for i in 0..MAX_INSERTED_VALUE { + assert_eq!(map.get(&i), Some(i * 2)); + } +} + +#[test] +fn concurrent_overlapped_modification() { + const MAX_VALUE: i32 = 512; + const NUM_THREADS: usize = 64; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.insert(i, 0), None); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for i in 0..MAX_VALUE { + assert!(map.modify(i, |_, x| x + 1).is_some()); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); + } +} + +#[test] +fn insert_or_modify() { + let map = HashMap::new(); + + assert_eq!(map.insert_or_modify("foo", 1, |_, x| x + 1), None); + assert_eq!(map.get("foo"), Some(1)); + + assert_eq!(map.insert_or_modify("foo", 1, |_, x| x + 1), Some(1)); + assert_eq!(map.get("foo"), Some(2)); +} + +#[test] +fn concurrent_insert_or_modify() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = Arc::new(HashMap::new()); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + map.insert_or_modify(j, 1, |_, x| x + 1); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(NUM_THREADS as i32)); + } +} + +#[test] +fn concurrent_overlapped_insertion() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = Arc::new(HashMap::with_capacity(MAX_VALUE as usize)); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + map.insert(j, j); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn concurrent_overlapped_growth() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = Arc::new(HashMap::with_capacity(1)); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + map.insert(j, j); + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert_eq!(map.len(), MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), Some(i)); + } +} + +#[test] +fn concurrent_overlapped_removal() { + const NUM_THREADS: usize = 64; + const MAX_VALUE: i32 = 512; + + let map = HashMap::with_capacity(MAX_VALUE as usize); + + for i in 0..MAX_VALUE { + map.insert(i, i); + } + + let map = Arc::new(map); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let threads: Vec<_> = (0..NUM_THREADS) + .map(|_| { + let map = map.clone(); + let barrier = barrier.clone(); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..MAX_VALUE { + let prev_value = map.remove(&j); + + if let Some(v) = prev_value { + assert_eq!(v, j); + } + } + }) + }) + .collect(); + + for result in threads.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + for i in 0..MAX_VALUE { + assert_eq!(map.get(&i), None); + } +} + +#[test] +fn drop_value() { + let key_parent = Arc::new(DropNotifier::new()); + let value_parent = Arc::new(DropNotifier::new()); + + { + let map = HashMap::new(); + + assert_eq!( + map.insert_and( + NoisyDropper::new(key_parent.clone(), 0), + NoisyDropper::new(value_parent.clone(), 0), + |_| () + ), + None + ); + assert!(!map.is_empty()); + assert_eq!(map.len(), 1); + map.get_and(&0, |v| assert_eq!(v, &0)); + + map.remove_and(&0, |v| assert_eq!(v, &0)); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + assert_eq!(map.get_and(&0, |_| ()), None); + + util::run_deferred(); + + assert!(!key_parent.was_dropped()); + assert!(value_parent.was_dropped()); + } + + util::run_deferred(); + + assert!(key_parent.was_dropped()); + assert!(value_parent.was_dropped()); +} + +#[test] +fn drop_many_values() { + const NUM_VALUES: usize = 1 << 16; + + let key_parents: Vec<_> = iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(); + let value_parents: Vec<_> = iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(); + + { + let map = HashMap::new(); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + for (i, (this_key_parent, this_value_parent)) in + key_parents.iter().zip(value_parents.iter()).enumerate() + { + assert_eq!( + map.insert_and( + NoisyDropper::new(this_key_parent.clone(), i), + NoisyDropper::new(this_value_parent.clone(), i), + |_| () + ), + None + ); + + assert!(!map.is_empty()); + assert_eq!(map.len(), i + 1); + } + + for i in 0..NUM_VALUES { + assert_eq!( + map.get_key_value_and(&i, |k, v| { + assert_eq!(*k, i); + assert_eq!(*v, i); + }), + Some(()) + ); + } + + for i in 0..NUM_VALUES { + assert_eq!( + map.remove_entry_and(&i, |k, v| { + assert_eq!(*k, i); + assert_eq!(*v, i); + }), + Some(()) + ); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(!this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(this_value_parent.was_dropped()); + } + + for i in 0..NUM_VALUES { + assert_eq!(map.get_and(&i, |_| ()), None); + } + } + + util::run_deferred(); + + for this_key_parent in key_parents.into_iter() { + assert!(this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.into_iter() { + assert!(this_value_parent.was_dropped()); + } +} + +#[test] +fn drop_many_values_concurrent() { + const NUM_THREADS: usize = 64; + const NUM_VALUES_PER_THREAD: usize = 512; + const NUM_VALUES: usize = NUM_THREADS * NUM_VALUES_PER_THREAD; + + let key_parents: Arc> = Arc::new( + iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(), + ); + let value_parents: Arc> = Arc::new( + iter::repeat_with(|| Arc::new(DropNotifier::new())) + .take(NUM_VALUES) + .collect(), + ); + + { + let map = Arc::new(HashMap::new()); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let handles: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = Arc::clone(&map); + let barrier = Arc::clone(&barrier); + let key_parents = Arc::clone(&key_parents); + let value_parents = Arc::clone(&value_parents); + + thread::spawn(move || { + barrier.wait(); + + let these_key_parents = + &key_parents[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD]; + let these_value_parents = + &value_parents[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD]; + + for (j, (this_key_parent, this_value_parent)) in these_key_parents + .iter() + .zip(these_value_parents.iter()) + .enumerate() + { + let key_value = i * NUM_VALUES_PER_THREAD + j; + + assert_eq!( + map.insert_and( + NoisyDropper::new(this_key_parent.clone(), key_value), + NoisyDropper::new(this_value_parent.clone(), key_value), + |_| () + ), + None + ); + } + }) + }) + .collect(); + + for result in handles.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), NUM_VALUES); + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(!this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(!this_value_parent.was_dropped()); + } + + for i in 0..NUM_VALUES { + assert_eq!( + map.get_key_value_and(&i, |k, v| { + assert_eq!(*k, i); + assert_eq!(*v, i); + }), + Some(()) + ); + } + + let handles: Vec<_> = (0..NUM_THREADS) + .map(|i| { + let map = Arc::clone(&map); + let barrier = Arc::clone(&barrier); + + thread::spawn(move || { + barrier.wait(); + + for j in 0..NUM_VALUES_PER_THREAD { + let key_value = i * NUM_VALUES_PER_THREAD + j; + + assert_eq!( + map.remove_entry_and(&key_value, |k, v| { + assert_eq!(*k, key_value); + assert_eq!(*v, key_value); + }), + Some(()) + ); + } + }) + }) + .collect(); + + for result in handles.into_iter().map(JoinHandle::join) { + assert!(result.is_ok()); + } + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(!this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(this_value_parent.was_dropped()); + } + + for i in 0..NUM_VALUES { + assert_eq!(map.get_and(&i, |_| ()), None); + } + } + + util::run_deferred(); + + for this_key_parent in key_parents.iter() { + assert!(this_key_parent.was_dropped()); + } + + for this_value_parent in value_parents.iter() { + assert!(this_value_parent.was_dropped()); + } +} + +#[test] +fn remove_if() { + const NUM_VALUES: usize = 512; + + let is_even = |_: &usize, v: &usize| *v % 2 == 0; + + let map = HashMap::new(); + + for i in 0..NUM_VALUES { + assert_eq!(map.insert(i, i), None); + } + + for i in 0..NUM_VALUES { + if is_even(&i, &i) { + assert_eq!(map.remove_if(&i, is_even), Some(i)); + } else { + assert_eq!(map.remove_if(&i, is_even), None); + } + } + + for i in (0..NUM_VALUES).filter(|i| i % 2 == 0) { + assert_eq!(map.get(&i), None); + } + + for i in (0..NUM_VALUES).filter(|i| i % 2 != 0) { + assert_eq!(map.get(&i), Some(i)); + } +} diff --git a/src/map/tests/util.rs b/src/map/tests/util.rs new file mode 100644 index 0000000..ad5af00 --- /dev/null +++ b/src/map/tests/util.rs @@ -0,0 +1,132 @@ +// MIT License +// +// Copyright (c) 2020 Gregory Meyer +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation files +// (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of the Software, +// and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +use std::{ + borrow::{Borrow, BorrowMut}, + hash::{Hash, Hasher}, + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +#[derive(Debug)] +pub(crate) struct NoisyDropper { + parent: Arc, + pub elem: T, +} + +impl NoisyDropper { + pub(crate) fn new(parent: Arc, elem: T) -> Self { + Self { parent, elem } + } +} + +impl Drop for NoisyDropper { + fn drop(&mut self) { + assert_eq!(self.parent.dropped.swap(true, Ordering::Relaxed), false); + } +} + +impl PartialEq for NoisyDropper { + fn eq(&self, other: &Self) -> bool { + self.elem == other.elem + } +} + +impl PartialEq for NoisyDropper { + fn eq(&self, other: &T) -> bool { + &self.elem == other + } +} + +impl Eq for NoisyDropper {} + +impl Hash for NoisyDropper { + fn hash(&self, hasher: &mut H) { + self.elem.hash(hasher); + } +} + +impl AsRef for NoisyDropper { + fn as_ref(&self) -> &T { + &self.elem + } +} + +impl AsMut for NoisyDropper { + fn as_mut(&mut self) -> &mut T { + &mut self.elem + } +} + +impl Borrow for NoisyDropper { + fn borrow(&self) -> &T { + &self.elem + } +} + +impl BorrowMut for NoisyDropper { + fn borrow_mut(&mut self) -> &mut T { + &mut self.elem + } +} + +impl Deref for NoisyDropper { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.elem + } +} + +impl DerefMut for NoisyDropper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.elem + } +} + +#[derive(Debug)] +pub(crate) struct DropNotifier { + dropped: AtomicBool, +} + +impl DropNotifier { + pub(crate) fn new() -> Self { + Self { + dropped: AtomicBool::new(false), + } + } + + pub(crate) fn was_dropped(&self) -> bool { + self.dropped.load(Ordering::Relaxed) + } +} + +pub(crate) fn run_deferred() { + for _ in 0..65536 { + crossbeam_epoch::pin().flush(); + } +}