Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Oct 30, 2024
1 parent 798c9ae commit afe2212
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
7 changes: 7 additions & 0 deletions mm2src/mm2_main/src/lp_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,17 @@ fn process_get_version_request(ctx: MmArc) -> Result<Option<Vec<u8>>, String> {
Ok(Some(encoded))
}

fn process_get_peer_utc_timestamp_request(_ctx: MmArc) -> Result<Option<Vec<u8>>, String> {
let timestamp = common::get_utc_timestamp();
let encoded = try_s!(encode_message(&timestamp));
Ok(Some(encoded))
}

pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got stats request {:?}", request);
match request {
NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx),
NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(ctx),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ use serde::{Deserialize, Serialize};
pub enum NetworkInfoRequest {
/// Get MM2 version of nodes added to stats collection
GetMm2Version,
/// Get UTC timestamp in seconds from the target peer
GetPeerUtcTimestamp,
}
43 changes: 42 additions & 1 deletion mm2src/mm2_p2p/src/behaviours/atomicdex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use common::executor::SpawnFuture;
use derive_more::Display;
use futures::channel::mpsc::UnboundedSender;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{channel::oneshot,
future::{join_all, poll_fn},
Expand Down Expand Up @@ -29,10 +30,12 @@ use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest,
use super::ping::AdexPing;
use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour,
RequestResponseSender};
use crate::application::request_response::network_info::NetworkInfoRequest;
use crate::application::request_response::P2PRequest;
use crate::network::{get_all_network_seednodes, DEFAULT_NETID};
use crate::relay_address::{RelayAddress, RelayAddressError};
use crate::swarm_runtime::SwarmRuntime;
use crate::{NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent};
use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent};

pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash};
pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent,
Expand Down Expand Up @@ -199,6 +202,26 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec<String> {
rx.await.expect("Tx should be present")
}

async fn validate_peer_time(
peer: PeerId,
mut response_tx: UnboundedSender<Option<PeerId>>,
rp_sender: RequestResponseSender,
) {
let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp);
let encoded_request = encode_message(&request)
.expect("Static type `NetworkInfoRequest::GetPeerUtcTimestamp` should never fail in serialization.");

if let PeerResponse::Ok { res } = request_one_peer(peer, encoded_request, rp_sender).await {
if let Ok(_timestamp) = decode_message::<u64>(&res) {
// TODO: get current timestamp and compare it
todo!();
};
};

// Validation failed, send peer-id to disconnect from it.
response_tx.send(Some(peer)).await.unwrap();
}

async fn request_one_peer(peer: PeerId, req: Vec<u8>, mut request_response_tx: RequestResponseSender) -> PeerResponse {
// Use the internal receiver to receive a response to this request.
let (internal_response_tx, internal_response_rx) = oneshot::channel();
Expand Down Expand Up @@ -724,6 +747,7 @@ fn start_gossipsub(
let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY);
let mut listening = false;

let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::unbounded();
let polling_fut = poll_fn(move |cx: &mut Context| {
loop {
match swarm.behaviour_mut().cmd_rx.poll_next_unpin(cx) {
Expand All @@ -733,11 +757,28 @@ fn start_gossipsub(
}
}

while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) {
println!("Peer '{}' has incorrect time, disconnecting from it.", peer_id);
swarm.disconnect_peer_id(peer_id).expect("TODO");
let peer_list: Vec<_> = swarm.connected_peers().collect();
dbg!(peer_list);
}

loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
debug!("Swarm event {:?}", event);

if let SwarmEvent::ConnectionEstablished { peer_id, .. } = &event {
println!("dbg: validating time");
let future = validate_peer_time(
*peer_id,
timestamp_tx.clone(),
swarm.behaviour().core.request_response.sender(),
);
swarm.behaviour().spawn(future);
}

if let SwarmEvent::Behaviour(event) = event {
if swarm.behaviour_mut().netid != DEFAULT_NETID {
if let AdexBehaviourEvent::Floodsub(FloodsubEvent::Message(message)) = &event {
Expand Down

0 comments on commit afe2212

Please sign in to comment.