Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): ensure time synchronization in the network #2255

Merged
merged 25 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4343c0e
add time validation core logic
onur-ozkan Oct 30, 2024
3b69689
nit fixes
onur-ozkan Oct 31, 2024
5acfe8d
handle time gap
onur-ozkan Oct 31, 2024
82ae6d2
improve logging
onur-ozkan Oct 31, 2024
5771790
add more trackable processing logs
onur-ozkan Oct 31, 2024
f42e706
improve info log and remove debugging leftover
onur-ozkan Oct 31, 2024
b693d8c
rename `NetworkInfoRequest` to `PeerInfoRequest`
onur-ozkan Oct 31, 2024
92af285
handle recently dialed peers
onur-ozkan Nov 4, 2024
86eb068
add useful logs
onur-ozkan Nov 4, 2024
c1a6eaf
create function for pre-dial check
onur-ozkan Nov 4, 2024
e5c284d
set max cap for timestamp channel
onur-ozkan Nov 4, 2024
130af67
remove leftover
onur-ozkan Nov 4, 2024
422626c
use `Multiaddr` as key
onur-ozkan Nov 4, 2024
3859afc
Merge branch 'dev' of github.com:KomodoPlatform/komodo-defi-framework…
onur-ozkan Nov 4, 2024
1c7229c
fix p2p tests
onur-ozkan Nov 4, 2024
0691aba
update logs
onur-ozkan Nov 5, 2024
3692673
rename leftovers
onur-ozkan Nov 13, 2024
8487ab1
update timing values
onur-ozkan Nov 13, 2024
faeee43
minor fixes
onur-ozkan Nov 13, 2024
1f2950e
update pre dial check calls
onur-ozkan Nov 14, 2024
9ad57a7
apply nit fixes
onur-ozkan Nov 14, 2024
ff1a52e
Merge branch 'dev' of github.com:KomodoPlatform/komodo-defi-framework…
onur-ozkan Dec 10, 2024
3b18529
don't update existing expiries
onur-ozkan Dec 10, 2024
7418211
revert breakage
onur-ozkan Dec 17, 2024
fd64a72
Merge branch 'dev' of github.com:KomodoPlatform/komodo-defi-framework…
onur-ozkan Dec 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mm2src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ parking_lot_core = { version = "0.6", features = ["nightly"] }
paste = "1.0"
primitive-types = "0.11.1"
rand = { version = "0.7", features = ["std", "small_rng"] }
rustc-hash = "1.1.0"
rustc-hash = "2.0"
regex = "1"
serde = "1"
serde_derive = "1"
Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,11 @@ fn process_p2p_request(
response_channel: mm2_libp2p::AdexResponseChannel,
) -> P2PRequestResult<()> {
let request = decode_message::<P2PRequest>(&request)?;
log::debug!("Got P2PRequest {:?}", request);

let result = match request {
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some),
};

let res = match result {
Expand Down
1 change: 0 additions & 1 deletion mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ impl TryFromBytes for Uuid {
}

pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got ordermatch request {:?}", request);
match request {
OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel),
OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => {
Expand Down
19 changes: 14 additions & 5 deletions mm2src/mm2_main/src/lp_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest;
use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError};
use serde_json::{self as json, Value as Json};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::sync::Arc;

use crate::lp_network::{add_reserved_peer_addresses, lp_network_ports, request_peers, NetIdError, ParseAddressError,
Expand Down Expand Up @@ -170,16 +171,24 @@ struct Mm2VersionRes {
nodes: HashMap<String, String>,
}

fn process_get_version_request(ctx: MmArc) -> Result<Option<Vec<u8>>, String> {
fn process_get_version_request(ctx: MmArc) -> Result<Vec<u8>, String> {
let response = ctx.mm_version().to_string();
let encoded = try_s!(encode_message(&response));
Ok(Some(encoded))
encode_message(&response).map_err(|e| e.to_string())
}

pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got stats request {:?}", request);
fn process_get_peer_utc_timestamp_request() -> Result<Vec<u8>, String> {
let timestamp = common::get_utc_timestamp();
let timestamp: u64 = timestamp
.try_into()
.unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", timestamp));

encode_message(&timestamp).map_err(|e| e.to_string())
}

pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Vec<u8>, String> {
match request {
NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx),
NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(),
}
}

Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use derive_more::Display;
use http::Response;
use mm2_core::mm_ctx::{from_ctx, MmArc};
use mm2_err_handle::prelude::*;
use mm2_libp2p::behaviours::atomicdex::MAX_TIME_GAP_FOR_CONNECTED_PEER;
use mm2_libp2p::{decode_signed, encode_and_sign, pub_sub_topic, PeerId, TopicPrefix};
use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr};
use mm2_state_machine::storable_state_machine::StateMachineStorage;
Expand Down Expand Up @@ -155,12 +156,13 @@ pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2;
pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6;
pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.;

const MAX_STARTED_AT_DIFF: u64 = 60;
const NEGOTIATE_SEND_INTERVAL: f64 = 30.;

/// If a certain P2P message is not received, swap will be aborted after this time expires.
const NEGOTIATION_TIMEOUT_SEC: u64 = 90;

const MAX_STARTED_AT_DIFF: u64 = MAX_TIME_GAP_FOR_CONNECTED_PEER * 3;

cfg_wasm32! {
use mm2_db::indexed_db::{ConstructibleDb, DbLocked};
use saved_swap::migrate_swaps_data;
Expand Down
2 changes: 2 additions & 0 deletions mm2src/mm2_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ void = "1.0"
futures-rustls = "0.24"
instant = "0.1.12"
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.11", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] }
timed-map = { version = "1.1.1", features = ["rustc-hash"] }
tokio = { version = "1.20", default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-rustls = "0.22"
instant = { version = "0.1.12", features = ["wasm-bindgen"] }
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.11", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] }
timed-map = { version = "1.1.1", features = ["rustc-hash"] }

[dev-dependencies]
async-std = "1.6.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ use serde::{Deserialize, Serialize};
pub enum NetworkInfoRequest {
/// Get MM2 version of nodes added to stats collection
GetMm2Version,
/// Get UTC timestamp in seconds from the target peer
GetPeerUtcTimestamp,
}
127 changes: 120 additions & 7 deletions mm2src/mm2_p2p/src/behaviours/atomicdex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::{channel::oneshot,
use futures_rustls::rustls;
use futures_ticker::Ticker;
use instant::Duration;
use lazy_static::lazy_static;
use libp2p::core::transport::Boxed as BoxedTransport;
use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::floodsub::{Floodsub, FloodsubEvent, Topic as FloodsubTopic};
Expand All @@ -23,16 +24,20 @@ use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::iter;
use std::net::IpAddr;
use std::sync::{Mutex, MutexGuard};
use std::task::{Context, Poll};
use timed_map::{MapKind, StdClock, TimedMap};

use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, PeersExchangeResponse};
use super::ping::AdexPing;
use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour,
RequestResponseSender};
use crate::application::request_response::network_info::NetworkInfoRequest;
use crate::application::request_response::P2PRequest;
use crate::network::{get_all_network_seednodes, DEFAULT_NETID};
use crate::relay_address::{RelayAddress, RelayAddressError};
use crate::swarm_runtime::SwarmRuntime;
use crate::{NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent};
use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent};

pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash};
pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent,
Expand All @@ -50,6 +55,21 @@ const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(600);
const ANNOUNCE_INITIAL_DELAY: Duration = Duration::from_secs(60);
const CHANNEL_BUF_SIZE: usize = 1024 * 8;

/// Used in time validation logic for each peer which runs immediately after the
/// `ConnectionEstablished` event.
///
/// Be careful when updating this value, we have some defaults (like for swaps)
/// depending on this.
pub const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20;

/// Used for storing peers in [`RECENTLY_DIALED_PEERS`].
const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would write this 5 * 60 :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explanation: it's just semantically clearer to put the conversion factors at the end (and ordered, so for 2 days converted to seconds u would use 2 * 24 * 60 * 60)


lazy_static! {
/// Tracks recently dialed peers to avoid repeated connection attempts.
static ref RECENTLY_DIALED_PEERS: Mutex<TimedMap<StdClock, Multiaddr, ()>> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap));
}

pub const DEPRECATED_NETID_LIST: &[u16] = &[
7777, // TODO: keep it inaccessible until Q2 of 2024.
];
Expand Down Expand Up @@ -162,6 +182,24 @@ pub enum AdexBehaviourCmd {
},
}

/// Determines if a dial attempt to the remote should be made.
///
/// Returns `false` if a dial attempt to the given address has already been made,
/// in which case the caller must skip the dial attempt.
fn check_and_mark_dialed(
recently_dialed_peers: &mut MutexGuard<TimedMap<StdClock, Multiaddr, ()>>,
addr: &Multiaddr,
) -> bool {
if recently_dialed_peers.get(addr).is_some() {
info!("Connection attempt was already made recently to '{addr}'.");
return false;
}

recently_dialed_peers.insert_expirable_unchecked(addr.clone(), (), DIAL_RETRY_DELAY);

true
}

/// Returns info about directly connected peers.
pub async fn get_directly_connected_peers(mut cmd_tx: AdexCmdTx) -> HashMap<String, Vec<String>> {
let (result_tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -199,6 +237,46 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec<String> {
rx.await.expect("Tx should be present")
}

async fn validate_peer_time(peer: PeerId, mut response_tx: Sender<Option<PeerId>>, rp_sender: RequestResponseSender) {
let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp);
let encoded_request = encode_message(&request)
.expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization.");

match request_one_peer(peer, encoded_request, rp_sender).await {
PeerResponse::Ok { res } => {
if let Ok(timestamp) = decode_message::<u64>(&res) {
mariocynicys marked this conversation as resolved.
Show resolved Hide resolved
let now = common::get_utc_timestamp();
let now: u64 = now
.try_into()
.unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", now));

let diff = now.abs_diff(timestamp);

// If time diff is in the acceptable gap, end the validation here.
if diff <= MAX_TIME_GAP_FOR_CONNECTED_PEER {
debug!(
"Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds."
);
response_tx.send(None).await.unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this send in redundant. what about not sending anything in here, as the other end of the end of the channel doesn't really use the None for anything.

Copy link
Member Author

@onur-ozkan onur-ozkan Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't send anything, it will block the request/process until timeout. The channel plays a request-response style communication here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please take a second look at this. the response_tx is for the timestamp checker and not a response to some node or something (i didn't really get what u mean).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thought was awaiting on result of this channel

return;
}
};
},
other => {
error!("Unexpected response `{other:?}` from peer `{peer}`");
// TODO: Ideally, we should send `Some(peer)` to end the connection,
// but we don't want to cause a breaking change yet.
response_tx.send(None).await.unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this None

return;
},
}

// If the function reaches this point, this means validation has failed.
// Send the peer ID to disconnect from it.
error!("Failed to validate the time for peer `{peer}`; disconnecting.");
response_tx.send(Some(peer)).await.unwrap();
}

async fn request_one_peer(peer: PeerId, req: Vec<u8>, mut request_response_tx: RequestResponseSender) -> PeerResponse {
// Use the internal receiver to receive a response to this request.
let (internal_response_tx, internal_response_rx) = oneshot::channel();
Expand Down Expand Up @@ -711,19 +789,26 @@ fn start_gossipsub(
_ => (),
}

let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap();
for relay in bootstrap.choose_multiple(&mut rng, mesh_n) {
if !check_and_mark_dialed(&mut recently_dialed_peers, relay) {
continue;
}

match libp2p::Swarm::dial(&mut swarm, relay.clone()) {
Ok(_) => info!("Dialed {}", relay),
Err(e) => error!("Dial {:?} failed: {:?}", relay, e),
}
}
drop(recently_dialed_peers);

let mut check_connected_relays_interval =
Ticker::new_with_next(CONNECTED_RELAYS_CHECK_INTERVAL, CONNECTED_RELAYS_CHECK_INTERVAL);

let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY);
let mut listening = false;

let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::channel(mesh_n_high);
let polling_fut = poll_fn(move |cx: &mut Context| {
loop {
match swarm.behaviour_mut().cmd_rx.poll_next_unpin(cx) {
Expand All @@ -733,11 +818,27 @@ fn start_gossipsub(
}
}

while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we can make this channel transmit PeerIDs instead of Option<PeerID>s.

if swarm.disconnect_peer_id(peer_id).is_err() {
error!("Disconnection from `{peer_id}` failed unexpectedly, which should never happen.");
}
}

loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
debug!("Swarm event {:?}", event);

if let SwarmEvent::ConnectionEstablished { peer_id, .. } = &event {
info!("Validating time data for peer `{peer_id}`.");
let future = validate_peer_time(
*peer_id,
timestamp_tx.clone(),
swarm.behaviour().core.request_response.sender(),
);
swarm.behaviour().spawn(future);
}

if let SwarmEvent::Behaviour(event) = event {
if swarm.behaviour_mut().netid != DEFAULT_NETID {
if let AdexBehaviourEvent::Floodsub(FloodsubEvent::Message(message)) = &event {
Expand Down Expand Up @@ -798,19 +899,29 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses

let mut rng = rand::thread_rng();
if connected_relays.len() < mesh_n_low {
let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap();
let to_connect_num = mesh_n - connected_relays.len();
let to_connect = swarm
.behaviour_mut()
.core
.peers_exchange
.get_random_peers(to_connect_num, |peer| !connected_relays.contains(peer));
let to_connect =
swarm
.behaviour_mut()
.core
.peers_exchange
.get_random_peers(to_connect_num, |peer, addresses| {
!connected_relays.contains(peer)
&& addresses
.iter()
.any(|addr| check_and_mark_dialed(&mut recently_dialed_peers, addr))
});

// choose some random bootstrap addresses to connect if peers exchange returned not enough peers
if to_connect.len() < to_connect_num {
let connect_bootstrap_num = to_connect_num - to_connect.len();
for addr in bootstrap_addresses
.iter()
.filter(|addr| !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr))
.filter(|addr| {
!swarm.behaviour().core.gossipsub.is_connected_to_addr(addr)
&& check_and_mark_dialed(&mut recently_dialed_peers, addr)
})
.collect::<Vec<_>>()
.choose_multiple(&mut rng, connect_bootstrap_num)
{
Expand All @@ -824,11 +935,13 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses
if swarm.behaviour().core.gossipsub.is_connected_to_addr(&addr) {
continue;
}
mariocynicys marked this conversation as resolved.
Show resolved Hide resolved

if let Err(e) = libp2p::Swarm::dial(swarm, addr.clone()) {
error!("Peer {} address {} dial error {}", peer, addr, e);
}
}
}
drop(recently_dialed_peers);
}

if connected_relays.len() > max_n {
Expand Down
Loading
Loading