Skip to content

Commit

Permalink
revert breakage
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Dec 17, 2024
1 parent 3b18529 commit 7418211
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 23 deletions.
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn process_p2p_request(

let result = match request {
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req),
P2PRequest::PeerInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some),
};

let res = match result {
Expand Down
33 changes: 18 additions & 15 deletions mm2src/mm2_main/src/lp_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::lock::Mutex as AsyncMutex;
use http::StatusCode;
use mm2_core::mm_ctx::{from_ctx, MmArc};
use mm2_err_handle::prelude::*;
use mm2_libp2p::application::request_response::peer_info::PeerInfoRequest;
use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest;
use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError};
use serde_json::{self as json, Value as Json};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -185,10 +185,10 @@ fn process_get_peer_utc_timestamp_request() -> Result<Vec<u8>, String> {
encode_message(&timestamp).map_err(|e| e.to_string())
}

pub fn process_info_request(ctx: MmArc, request: PeerInfoRequest) -> Result<Vec<u8>, String> {
pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Vec<u8>, String> {
match request {
PeerInfoRequest::GetMm2Version => process_get_version_request(ctx),
PeerInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(),
NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx),
NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(),
}
}

Expand Down Expand Up @@ -307,17 +307,20 @@ async fn stat_collection_loop(ctx: MmArc, interval: f64) {
let peers: Vec<String> = peers_names.keys().cloned().collect();

let timestamp = now_sec();
let get_versions_res =
match request_peers::<String>(ctx.clone(), P2PRequest::PeerInfo(PeerInfoRequest::GetMm2Version), peers)
.await
{
Ok(res) => res,
Err(e) => {
log::error!("Error getting nodes versions from peers: {}", e);
Timer::sleep(10.).await;
continue;
},
};
let get_versions_res = match request_peers::<String>(
ctx.clone(),
P2PRequest::NetworkInfo(NetworkInfoRequest::GetMm2Version),
peers,
)
.await
{
Ok(res) => res,
Err(e) => {
log::error!("Error getting nodes versions from peers: {}", e);
Timer::sleep(10.).await;
continue;
},
};

for (peer_id, response) in get_versions_res {
let name = match peers_names.get(&peer_id.to_string()) {
Expand Down
10 changes: 7 additions & 3 deletions mm2src/mm2_p2p/src/application/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//! which are separate from other request types such as RPC requests or Gossipsub
//! messages.
pub mod network_info;
pub mod ordermatch;
pub mod peer_info;

use serde::{Deserialize, Serialize};

Expand All @@ -12,6 +12,10 @@ use serde::{Deserialize, Serialize};
pub enum P2PRequest {
/// Request for order matching.
Ordermatch(ordermatch::OrdermatchRequest),
/// Request various information from the target peer.
PeerInfo(peer_info::PeerInfoRequest),
/// Request for network information from the target peer.
///
/// TODO: This should be called `PeerInfoRequest` instead. However, renaming it
/// will introduce a breaking change in the network and is not worth it. Do this
/// renaming when there is already a breaking change in the release.
NetworkInfo(network_info::NetworkInfoRequest),
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
/// Wraps the different types of network information requests for the P2P request-response
/// protocol.
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
pub enum PeerInfoRequest {
pub enum NetworkInfoRequest {
/// Get MM2 version of nodes added to stats collection
GetMm2Version,
/// Get UTC timestamp in seconds from the target peer
Expand Down
12 changes: 9 additions & 3 deletions mm2src/mm2_p2p/src/behaviours/atomicdex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest,
use super::ping::AdexPing;
use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour,
RequestResponseSender};
use crate::application::request_response::peer_info::PeerInfoRequest;
use crate::application::request_response::network_info::NetworkInfoRequest;
use crate::application::request_response::P2PRequest;
use crate::network::{get_all_network_seednodes, DEFAULT_NETID};
use crate::relay_address::{RelayAddress, RelayAddressError};
Expand Down Expand Up @@ -238,7 +238,7 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec<String> {
}

async fn validate_peer_time(peer: PeerId, mut response_tx: Sender<Option<PeerId>>, rp_sender: RequestResponseSender) {
let request = P2PRequest::PeerInfo(PeerInfoRequest::GetPeerUtcTimestamp);
let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp);
let encoded_request = encode_message(&request)
.expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization.");

Expand All @@ -262,7 +262,13 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender<Option<PeerId>
}
};
},
other => error!("Unexpected response `{other:?}` from peer `{peer}`"),
other => {
error!("Unexpected response `{other:?}` from peer `{peer}`");
// TODO: Ideally, we should send `Some(peer)` to end the connection,
// but we don't want to cause a breaking change yet.
response_tx.send(None).await.unwrap();
return;
},
}

// If the function reaches this point, this means validation has failed.
Expand Down

0 comments on commit 7418211

Please sign in to comment.