Skip to content

Commit

Permalink
Replace gossipsub received messages from lru::LruCache with TimeCache (
Browse files Browse the repository at this point in the history
…#766)

* Replace gossipsub received messages from lru::LruCache with TimeCache

* Add metrics to track the GossipsubBehaviour::received
  • Loading branch information
sergeyboyko0791 authored Dec 12, 2020
1 parent f66fcc0 commit 3918a3d
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 12 deletions.
25 changes: 14 additions & 11 deletions mm2src/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use crate::handler::GossipsubHandler;
use crate::mcache::MessageCache;
use crate::protocol::{GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId};
use crate::time_cache::{Entry as TimeCacheEntry, TimeCache};
use crate::topic::{Topic, TopicHash};
use futures::prelude::*;
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler};
use log::{debug, error, info, trace, warn};
use lru::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use smallvec::SmallVec;
use std::time::Duration;
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct Gossipsub {
/// We keep track of the messages we received (in the format `string(source ID, seq_no)`) so that
/// we don't dispatch the same message twice if we receive it twice on the network.
/// Also store the peers from which message was received so we don't manually propagate already known message to them
received: LruCache<MessageId, SmallVec<[PeerId; 12]>>,
received: TimeCache<MessageId, SmallVec<[PeerId; 12]>>,

/// Heartbeat interval stream.
heartbeat: Interval,
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Gossipsub {
gs_config.history_length,
gs_config.message_id_fn,
),
received: LruCache::new(8192 * 8), // keep track of the last 8192 * 8 messages
received: TimeCache::new(gs_config.duplicate_cache_time),
heartbeat: Interval::new_at(
Instant::now() + gs_config.heartbeat_initial_delay,
gs_config.heartbeat_interval,
Expand Down Expand Up @@ -297,7 +297,8 @@ impl Gossipsub {
// add published message to our received caches
let msg_id = (self.config.message_id_fn)(&message);
self.mcache.put(message.clone());
self.received.put(msg_id.clone(), SmallVec::from_elem(local_peer_id, 1));
self.received
.insert(msg_id.clone(), SmallVec::from_elem(local_peer_id, 1));

debug!("Published message: {:?}", msg_id);

Expand Down Expand Up @@ -426,7 +427,7 @@ impl Gossipsub {
}

for id in ids {
if !self.received.contains(&id) {
if !self.received.contains_key(&id) {
// have not seen this message, request it
iwant_ids.insert(id);
}
Expand Down Expand Up @@ -596,15 +597,14 @@ impl Gossipsub {
fn handle_received_message(&mut self, msg: GossipsubMessage, propagation_source: &PeerId) {
let msg_id = (self.config.message_id_fn)(&msg);
debug!("Handling message: {:?} from peer: {:?}", msg_id, propagation_source);
match self.received.get_mut(&msg_id) {
Some(peers) => {
match self.received.entry(msg_id.clone()) {
TimeCacheEntry::Occupied(entry) => {
debug!("Message already received, ignoring. Message: {:?}", msg_id);
peers.push(propagation_source.clone());
entry.into_mut().push(propagation_source.clone());
return;
},
None => {
self.received
.put(msg_id.clone(), SmallVec::from_elem(propagation_source.clone(), 1));
TimeCacheEntry::Vacant(entry) => {
entry.insert(SmallVec::from_elem(propagation_source.clone(), 1));
},
}
// add to the memcache
Expand Down Expand Up @@ -1103,6 +1103,9 @@ impl Gossipsub {

pub fn get_all_peer_topics(&self) -> &HashMap<PeerId, Vec<TopicHash>> { &self.peer_topics }

/// Get count of received messages in the [`GossipsubConfig::duplicate_cache_time`] period.
pub fn get_received_messages_in_period(&self) -> (Duration, usize) { (self.received.ttl(), self.received.len()) }

pub fn get_config(&self) -> &GossipsubConfig { &self.config }

/// Adds peers to relays mesh and notifies them they are added
Expand Down
2 changes: 1 addition & 1 deletion mm2src/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ mod tests {
build_and_inject_nodes(20, vec![String::from("topic1")], GossipsubConfig::default(), true);

let msg_id = MessageId(String::from("known id"));
gs.received.put(msg_id.clone(), SmallVec::new());
gs.received.insert(msg_id.clone(), SmallVec::new());

let events_before = gs.events.len();
gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]);
Expand Down
7 changes: 7 additions & 0 deletions mm2src/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub struct GossipsubConfig {
/// The maximum byte size for each gossip (default is 2048 bytes).
pub max_transmit_size: usize,

/// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
/// This settings sets the time period that messages are stored in the cache. Duplicates can be
/// received if duplicate messages are sent at a time greater than this setting apart. The
/// default is 1 minute.
pub duplicate_cache_time: Duration,

/// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false).
pub hash_topics: bool,

Expand Down Expand Up @@ -103,6 +109,7 @@ impl Default for GossipsubConfig {
heartbeat_interval: Duration::from_secs(1),
fanout_ttl: Duration::from_secs(60),
max_transmit_size: 2048,
duplicate_cache_time: Duration::from_secs(60),
hash_topics: false, // default compatibility with floodsub
no_source_id: false,
manual_propagation: false,
Expand Down
1 change: 1 addition & 0 deletions mm2src/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ mod behaviour;
mod config;
mod handler;
mod mcache;
mod time_cache;
mod topic;

mod rpc_proto {
Expand Down
Loading

0 comments on commit 3918a3d

Please sign in to comment.