diff --git a/mm2src/gossipsub/src/behaviour.rs b/mm2src/gossipsub/src/behaviour.rs index 8ddddf2597..84884b0d96 100644 --- a/mm2src/gossipsub/src/behaviour.rs +++ b/mm2src/gossipsub/src/behaviour.rs @@ -23,12 +23,12 @@ use crate::handler::GossipsubHandler; use crate::mcache::MessageCache; use crate::protocol::{GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, MessageId}; +use crate::time_cache::{Entry as TimeCacheEntry, TimeCache}; use crate::topic::{Topic, TopicHash}; use futures::prelude::*; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler}; use log::{debug, error, info, trace, warn}; -use lru::LruCache; use rand::{seq::SliceRandom, thread_rng}; use smallvec::SmallVec; use std::time::Duration; @@ -84,7 +84,7 @@ pub struct Gossipsub { /// We keep track of the messages we received (in the format `string(source ID, seq_no)`) so that /// we don't dispatch the same message twice if we receive it twice on the network. /// Also store the peers from which message was received so we don't manually propagate already known message to them - received: LruCache>, + received: TimeCache>, /// Heartbeat interval stream. heartbeat: Interval, @@ -124,7 +124,7 @@ impl Gossipsub { gs_config.history_length, gs_config.message_id_fn, ), - received: LruCache::new(8192 * 8), // keep track of the last 8192 * 8 messages + received: TimeCache::new(gs_config.duplicate_cache_time), heartbeat: Interval::new_at( Instant::now() + gs_config.heartbeat_initial_delay, gs_config.heartbeat_interval, @@ -297,7 +297,8 @@ impl Gossipsub { // add published message to our received caches let msg_id = (self.config.message_id_fn)(&message); self.mcache.put(message.clone()); - self.received.put(msg_id.clone(), SmallVec::from_elem(local_peer_id, 1)); + self.received + .insert(msg_id.clone(), SmallVec::from_elem(local_peer_id, 1)); debug!("Published message: {:?}", msg_id); @@ -426,7 +427,7 @@ impl Gossipsub { } for id in ids { - if !self.received.contains(&id) { + if !self.received.contains_key(&id) { // have not seen this message, request it iwant_ids.insert(id); } @@ -596,15 +597,14 @@ impl Gossipsub { fn handle_received_message(&mut self, msg: GossipsubMessage, propagation_source: &PeerId) { let msg_id = (self.config.message_id_fn)(&msg); debug!("Handling message: {:?} from peer: {:?}", msg_id, propagation_source); - match self.received.get_mut(&msg_id) { - Some(peers) => { + match self.received.entry(msg_id.clone()) { + TimeCacheEntry::Occupied(entry) => { debug!("Message already received, ignoring. Message: {:?}", msg_id); - peers.push(propagation_source.clone()); + entry.into_mut().push(propagation_source.clone()); return; }, - None => { - self.received - .put(msg_id.clone(), SmallVec::from_elem(propagation_source.clone(), 1)); + TimeCacheEntry::Vacant(entry) => { + entry.insert(SmallVec::from_elem(propagation_source.clone(), 1)); }, } // add to the memcache @@ -1103,6 +1103,9 @@ impl Gossipsub { pub fn get_all_peer_topics(&self) -> &HashMap> { &self.peer_topics } + /// Get count of received messages in the [`GossipsubConfig::duplicate_cache_time`] period. + pub fn get_received_messages_in_period(&self) -> (Duration, usize) { (self.received.ttl(), self.received.len()) } + pub fn get_config(&self) -> &GossipsubConfig { &self.config } /// Adds peers to relays mesh and notifies them they are added diff --git a/mm2src/gossipsub/src/behaviour/tests.rs b/mm2src/gossipsub/src/behaviour/tests.rs index 4de397e3d7..c6d46c9a8a 100644 --- a/mm2src/gossipsub/src/behaviour/tests.rs +++ b/mm2src/gossipsub/src/behaviour/tests.rs @@ -683,7 +683,7 @@ mod tests { build_and_inject_nodes(20, vec![String::from("topic1")], GossipsubConfig::default(), true); let msg_id = MessageId(String::from("known id")); - gs.received.put(msg_id.clone(), SmallVec::new()); + gs.received.insert(msg_id.clone(), SmallVec::new()); let events_before = gs.events.len(); gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]); diff --git a/mm2src/gossipsub/src/config.rs b/mm2src/gossipsub/src/config.rs index 33a8d45b5e..cafd0c1ce1 100644 --- a/mm2src/gossipsub/src/config.rs +++ b/mm2src/gossipsub/src/config.rs @@ -64,6 +64,12 @@ pub struct GossipsubConfig { /// The maximum byte size for each gossip (default is 2048 bytes). pub max_transmit_size: usize, + /// Duplicates are prevented by storing message id's of known messages in an LRU time cache. + /// This settings sets the time period that messages are stored in the cache. Duplicates can be + /// received if duplicate messages are sent at a time greater than this setting apart. The + /// default is 1 minute. + pub duplicate_cache_time: Duration, + /// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false). pub hash_topics: bool, @@ -103,6 +109,7 @@ impl Default for GossipsubConfig { heartbeat_interval: Duration::from_secs(1), fanout_ttl: Duration::from_secs(60), max_transmit_size: 2048, + duplicate_cache_time: Duration::from_secs(60), hash_topics: false, // default compatibility with floodsub no_source_id: false, manual_propagation: false, diff --git a/mm2src/gossipsub/src/lib.rs b/mm2src/gossipsub/src/lib.rs index e0efa95571..e233356a2f 100644 --- a/mm2src/gossipsub/src/lib.rs +++ b/mm2src/gossipsub/src/lib.rs @@ -141,6 +141,7 @@ mod behaviour; mod config; mod handler; mod mcache; +mod time_cache; mod topic; mod rpc_proto { diff --git a/mm2src/gossipsub/src/time_cache.rs b/mm2src/gossipsub/src/time_cache.rs new file mode 100644 index 0000000000..1b166ed672 --- /dev/null +++ b/mm2src/gossipsub/src/time_cache.rs @@ -0,0 +1,298 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! This implements a time-based LRU cache for checking gossipsub message duplicates. + +use fnv::FnvHashMap; +use std::collections::hash_map::{self, + Entry::{Occupied, Vacant}}; +use std::collections::VecDeque; +use std::time::{Duration, Instant}; + +struct ExpiringElement { + /// The element that expires + element: Element, + /// The expire time. + expires: Instant, +} + +pub struct TimeCache { + /// Mapping a key to its value together with its latest expire time (can be updated through + /// reinserts). + map: FnvHashMap>, + /// An ordered list of keys by expires time. + list: VecDeque>, + /// The time elements remain in the cache. + ttl: Duration, +} + +pub struct OccupiedEntry<'a, K, V> { + expiration: Instant, + entry: hash_map::OccupiedEntry<'a, K, ExpiringElement>, + list: &'a mut VecDeque>, +} + +impl<'a, K, V> OccupiedEntry<'a, K, V> +where + K: Eq + std::hash::Hash + Clone, +{ + pub fn into_mut(self) -> &'a mut V { &mut self.entry.into_mut().element } + + #[allow(dead_code)] + pub fn insert_without_updating_expiration(&mut self, value: V) -> V { + //keep old expiration, only replace value of element + ::std::mem::replace(&mut self.entry.get_mut().element, value) + } + + #[allow(dead_code)] + pub fn insert_and_update_expiration(&mut self, value: V) -> V { + //We push back an additional element, the first reference in the list will be ignored + // since we also updated the expires in the map, see below. + self.list.push_back(ExpiringElement { + element: self.entry.key().clone(), + expires: self.expiration, + }); + self.entry + .insert(ExpiringElement { + element: value, + expires: self.expiration, + }) + .element + } +} + +pub struct VacantEntry<'a, K, V> { + expiration: Instant, + entry: hash_map::VacantEntry<'a, K, ExpiringElement>, + list: &'a mut VecDeque>, +} + +impl<'a, K, V> VacantEntry<'a, K, V> +where + K: Eq + std::hash::Hash + Clone, +{ + pub fn insert(self, value: V) -> &'a mut V { + self.list.push_back(ExpiringElement { + element: self.entry.key().clone(), + expires: self.expiration, + }); + &mut self + .entry + .insert(ExpiringElement { + element: value, + expires: self.expiration, + }) + .element + } +} + +pub enum Entry<'a, K: 'a, V: 'a> { + Occupied(OccupiedEntry<'a, K, V>), + Vacant(VacantEntry<'a, K, V>), +} + +#[allow(dead_code)] +impl<'a, K: 'a, V: 'a> Entry<'a, K, V> +where + K: Eq + std::hash::Hash + Clone, +{ + pub fn or_insert_with V>(self, default: F) -> &'a mut V { + match self { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => entry.insert(default()), + } + } +} + +impl TimeCache +where + Key: Eq + std::hash::Hash + Clone, +{ + pub fn new(ttl: Duration) -> Self { + TimeCache { + map: FnvHashMap::default(), + list: VecDeque::new(), + ttl, + } + } + + fn remove_expired_keys(&mut self, now: Instant) { + while let Some(element) = self.list.pop_front() { + if element.expires > now { + self.list.push_front(element); + break; + } + if let Occupied(entry) = self.map.entry(element.element.clone()) { + if entry.get().expires <= now { + entry.remove(); + } + } + } + } + + pub fn entry(&mut self, key: Key) -> Entry { + let now = Instant::now(); + self.remove_expired_keys(now); + match self.map.entry(key) { + Occupied(entry) => Entry::Occupied(OccupiedEntry { + expiration: now + self.ttl, + entry, + list: &mut self.list, + }), + Vacant(entry) => Entry::Vacant(VacantEntry { + expiration: now + self.ttl, + entry, + list: &mut self.list, + }), + } + } + + // Inserts new element and removes any expired elements. + // + // If the key was not present this returns `true`. If the value was already present this + // returns `false`. + pub fn insert(&mut self, key: Key, value: Value) -> bool { + if let Entry::Vacant(entry) = self.entry(key) { + entry.insert(value); + true + } else { + false + } + } + + /// Empties the entire cache. + #[allow(dead_code)] + pub fn clear(&mut self) { + self.map.clear(); + self.list.clear(); + } + + pub fn contains_key(&mut self, key: &Key) -> bool { self.map.contains_key(key) } + + pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.element) } + + pub fn len(&self) -> usize { self.map.len() } + + pub fn ttl(&self) -> Duration { self.ttl } +} + +#[allow(dead_code)] +pub struct DuplicateCache(TimeCache); + +#[allow(dead_code)] +impl DuplicateCache +where + Key: Eq + std::hash::Hash + Clone, +{ + pub fn new(ttl: Duration) -> Self { Self(TimeCache::new(ttl)) } + + // Inserts new elements and removes any expired elements. + // + // If the key was not present this returns `true`. If the value was already present this + // returns `false`. + pub fn insert(&mut self, key: Key) -> bool { + if let Entry::Vacant(entry) = self.0.entry(key) { + entry.insert(()); + true + } else { + false + } + } + + pub fn contains(&mut self, key: &Key) -> bool { self.0.contains_key(key) } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn time_cache_added_entries_exist() { + let mut cache = TimeCache::new(Duration::from_secs(10)); + + assert!(cache.insert("t", "tv".to_owned())); + assert!(cache.insert("e", "ev".to_owned())); + + // Should report that 't' and 't' already exists + assert!(!cache.insert("t", "td".to_owned())); + assert!(!cache.insert("e", "ed".to_owned())); + + assert_eq!(cache.get(&"t"), Some(&"tv".to_owned())); + assert_eq!(cache.get(&"e"), Some(&"ev".to_owned())); + assert_eq!(cache.get(&"f"), None); + } + + #[test] + fn time_cache_expired() { + let mut cache = TimeCache::new(Duration::from_secs(1)); + + assert!(cache.insert("t", "tv".to_owned())); + assert_eq!(cache.get(&"t"), Some(&"tv".to_owned())); + + std::thread::sleep(Duration::from_millis(500)); + assert!(cache.insert("e", "ev".to_owned())); + assert_eq!(cache.get(&"t"), Some(&"tv".to_owned())); + assert_eq!(cache.get(&"e"), Some(&"ev".to_owned())); + + std::thread::sleep(Duration::from_millis(700)); + // insert other value to initiate the expiration + assert!(cache.insert("f", "fv".to_owned())); + // must be expired already + assert_eq!(cache.get(&"t"), None); + assert_eq!(cache.get(&"e"), Some(&"ev".to_owned())); + + std::thread::sleep(Duration::from_millis(700)); + // insert other value to initiate the expiration + assert!(cache.insert("d", "dv".to_owned())); + // must be expired already + assert_eq!(cache.get(&"t"), None); + assert_eq!(cache.get(&"e"), None); + } + + #[test] + fn cache_added_entries_exist() { + let mut cache = DuplicateCache::new(Duration::from_secs(10)); + + cache.insert("t"); + cache.insert("e"); + + // Should report that 't' and 't' already exists + assert!(!cache.insert("t")); + assert!(!cache.insert("e")); + } + + #[test] + fn cache_entries_expire() { + let mut cache = DuplicateCache::new(Duration::from_millis(100)); + + cache.insert("t"); + assert!(!cache.insert("t")); + cache.insert("e"); + //assert!(!cache.insert("t")); + assert!(!cache.insert("e")); + // sleep until cache expiry + std::thread::sleep(Duration::from_millis(101)); + // add another element to clear previous cache + cache.insert("s"); + + // should be removed from the cache + assert!(cache.insert("t")); + } +} diff --git a/mm2src/lp_native_dex.rs b/mm2src/lp_native_dex.rs index 099c135bd4..7aa99aefc3 100644 --- a/mm2src/lp_native_dex.rs +++ b/mm2src/lp_native_dex.rs @@ -500,6 +500,13 @@ pub async fn lp_init(mypubport: u16, ctx: MmArc) -> Result<(), String> { swarm.connected_relays_len() as i64 ); mm_gauge!(ctx_on_poll.metrics, "p2p.relay_mesh.len", swarm.relay_mesh_len() as i64); + let (period, received_msgs) = swarm.received_messages_in_period(); + mm_gauge!( + ctx_on_poll.metrics, + "p2p.received_messages.period_in_secs", + period.as_secs() as i64 + ); + mm_gauge!(ctx_on_poll.metrics, "p2p.received_messages.count", received_msgs as i64); }, ); try_s!(ctx.peer_id.pin(peer_id.to_string())); diff --git a/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs b/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs index 711d77c21d..fe7d35a5a5 100644 --- a/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs +++ b/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs @@ -395,6 +395,8 @@ impl AtomicDexBehaviour { pub fn connected_relays_len(&self) -> usize { self.gossipsub.connected_relays_len() } pub fn relay_mesh_len(&self) -> usize { self.gossipsub.relay_mesh_len() } + + pub fn received_messages_in_period(&self) -> (Duration, usize) { self.gossipsub.get_received_messages_in_period() } } impl NetworkBehaviourEventProcess for AtomicDexBehaviour { @@ -585,6 +587,7 @@ const ALL_NETID_7777_SEEDNODES: &[(&str, &str)] = &[ /// 1. tx to send control commands /// 2. rx emitting gossip events to processing side /// 3. our peer_id +#[allow(clippy::too_many_arguments)] pub fn start_gossipsub( ip: IpAddr, port: u16,