Skip to content

Commit

Permalink
Add request and response ID to every p2p message and route messages a… (
Browse files Browse the repository at this point in the history
kaspanet#311)

* increase initial difficulty 100x

* Add request and response ID to every p2p message and route messages accordingly

* Add protocol v6

* Some fixes

* Address review comments

* Add BLANK_ROUTE_ID

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
someone235 and michaelsutton authored Nov 2, 2023
1 parent d66bd6c commit 5556789
Show file tree
Hide file tree
Showing 18 changed files with 388 additions and 86 deletions.
14 changes: 7 additions & 7 deletions consensus/core/src/config/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ pub const TESTNET_GENESIS: GenesisBlock = GenesisBlock {

pub const TESTNET11_GENESIS: GenesisBlock = GenesisBlock {
hash: Hash::from_bytes([
0xf0, 0xa3, 0x15, 0x04, 0xfb, 0x4b, 0xf8, 0x83, 0xd0, 0xdd, 0x2a, 0x75, 0x1c, 0x7d, 0xac, 0x06, 0x1a, 0xb4, 0xc6, 0x33, 0x00,
0xc5, 0x4f, 0xd2, 0x64, 0xd8, 0xe3, 0x06, 0x99, 0xda, 0x51, 0x54,
0x5a, 0x90, 0xf8, 0x71, 0x09, 0x32, 0x3d, 0x61, 0x41, 0xff, 0x51, 0x04, 0xa2, 0xd5, 0xf8, 0xd8, 0x85, 0x7a, 0x6f, 0x39, 0x2e,
0xb4, 0x90, 0x5c, 0xe3, 0x55, 0x5e, 0xc9, 0x12, 0xcd, 0xfb, 0x9c,
]),
bits: 520421375, // see `gen_testnet11_genesis`
bits: 504155340, // see `gen_testnet11_genesis`
..TESTNET_GENESIS
};

Expand Down Expand Up @@ -227,11 +227,11 @@ mod tests {
let bps = Testnet11Bps::bps();
let mut genesis = TESTNET_GENESIS;
let target = kaspa_math::Uint256::from_compact_target_bits(genesis.bits);
let scaled_up_target = target * bps;
let scaled_up_bits = scaled_up_target.compact_target_bits();
genesis.bits = scaled_up_bits;
let scaled_target = target * bps / 100;
let scaled_bits = scaled_target.compact_target_bits();
genesis.bits = scaled_bits;
if genesis.bits != TESTNET11_GENESIS.bits {
panic!("Testnet 11: new bits: {}\nnew hash: {:#04x?}", scaled_up_bits, Block::from(&genesis).hash().as_bytes());
panic!("Testnet 11: new bits: {}\nnew hash: {:#04x?}", scaled_bits, Block::from(&genesis).hash().as_bytes());
}
}

Expand Down
10 changes: 5 additions & 5 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::flowcontext::{orphans::OrphanBlocksPool, process_queue::ProcessQueue, transactions::TransactionsSpread};
use crate::v5;
use crate::{v5, v6};
use async_trait::async_trait;
use futures::future::join_all;
use kaspa_addressmanager::AddressManager;
Expand Down Expand Up @@ -54,7 +54,7 @@ use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use uuid::Uuid;

/// The P2P protocol version. Currently the only one supported.
const PROTOCOL_VERSION: u32 = 5;
const PROTOCOL_VERSION: u32 = 6;

/// See `check_orphan_resolution_range`
const BASELINE_ORPHAN_RESOLUTION_RANGE: u32 = 5;
Expand Down Expand Up @@ -573,8 +573,8 @@ impl ConnectionInitializer for FlowContext {

// Register all flows according to version
let (flows, applied_protocol_version) = match peer_version.protocol_version {
PROTOCOL_VERSION => (v5::register(self.clone(), router.clone()), PROTOCOL_VERSION),
// TODO: different errors for obsolete (low version) vs unknown (high)
v if v >= PROTOCOL_VERSION => (v6::register(self.clone(), router.clone()), PROTOCOL_VERSION),
5 => (v5::register(self.clone(), router.clone()), 5),
v => return Err(ProtocolError::VersionMismatch(PROTOCOL_VERSION, v)),
};

Expand All @@ -592,7 +592,7 @@ impl ConnectionInitializer for FlowContext {
// Send and receive the ready signal
handshake.exchange_ready_messages().await?;

info!("Registering p2p flows for peer {} for protocol version {}", router, peer_version.protocol_version);
info!("Registering p2p flows for peer {} for protocol version {}", router, applied_protocol_version);

// Launch all flows. Note we launch only after the ready signal was exchanged
for flow in flows {
Expand Down
1 change: 1 addition & 0 deletions protocol/flows/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod flow_trait;
pub mod flowcontext;
pub mod service;
pub mod v5;
pub mod v6;
38 changes: 26 additions & 12 deletions protocol/flows/src/v5/blockrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use kaspa_core::{debug, info};
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, dequeue_with_timeout, make_message,
dequeue, dequeue_with_timeout, make_message, make_request,
pb::{kaspad_message::Payload, InvRelayBlockMessage, RequestBlockLocatorMessage, RequestRelayBlocksMessage},
IncomingRoute, Router,
IncomingRoute, Router, SharedIncomingRoute,
};
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::mpsc::{error::TrySendError, Sender};
Expand All @@ -22,12 +22,12 @@ pub struct RelayInvMessage {

/// Encapsulates an incoming invs route which also receives data locally
pub struct TwoWayIncomingRoute {
incoming_route: IncomingRoute,
incoming_route: SharedIncomingRoute,
indirect_invs: VecDeque<Hash>,
}

impl TwoWayIncomingRoute {
pub fn new(incoming_route: IncomingRoute) -> Self {
pub fn new(incoming_route: SharedIncomingRoute) -> Self {
Self { incoming_route, indirect_invs: VecDeque::new() }
}

Expand Down Expand Up @@ -72,7 +72,7 @@ impl HandleRelayInvsFlow {
pub fn new(
ctx: FlowContext,
router: Arc<Router>,
invs_route: IncomingRoute,
invs_route: SharedIncomingRoute,
msg_route: IncomingRoute,
ibd_sender: Sender<Block>,
) -> Self {
Expand Down Expand Up @@ -111,7 +111,7 @@ impl HandleRelayInvsFlow {
}

// We keep the request scope alive until consensus processes the block
let Some((block, request_scope)) = self.request_block(inv.hash).await? else {
let Some((block, request_scope)) = self.request_block(inv.hash, self.msg_route.id()).await? else {
debug!("Relay block {} was already requested from another peer, continuing...", inv.hash);
continue;
};
Expand Down Expand Up @@ -184,13 +184,21 @@ impl HandleRelayInvsFlow {
}
}

async fn request_block(&mut self, requested_hash: Hash) -> Result<Option<(Block, RequestScope<Hash>)>, ProtocolError> {
async fn request_block(
&mut self,
requested_hash: Hash,
request_id: u32,
) -> Result<Option<(Block, RequestScope<Hash>)>, ProtocolError> {
// Note: the request scope is returned and should be captured until block processing is completed
let Some(request_scope) = self.ctx.try_adding_block_request(requested_hash) else {
return Ok(None);
};
self.router
.enqueue(make_message!(Payload::RequestRelayBlocks, RequestRelayBlocksMessage { hashes: vec![requested_hash.into()] }))
.enqueue(make_request!(
Payload::RequestRelayBlocks,
RequestRelayBlocksMessage { hashes: vec![requested_hash.into()] },
request_id
))
.await?;
let msg = dequeue_with_timeout!(self.msg_route, Payload::Block)?;
let block: Block = msg.try_into()?;
Expand All @@ -210,7 +218,7 @@ impl HandleRelayInvsFlow {
// Add the block to the orphan pool if it's within orphan resolution range.
// If the block is indirect it means one of its descendants was already is resolution range, so
// we can avoid the query.
if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash()).await? {
if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? {
let hash = block.hash();
self.ctx.add_orphan(block).await;
self.enqueue_orphan_roots(consensus, hash).await;
Expand All @@ -230,11 +238,17 @@ impl HandleRelayInvsFlow {
/// mechanism or via IBD. This method sends a BlockLocator request to the peer with
/// a limit of `ctx.orphan_resolution_range`. In the response, if we know none of the hashes,
/// we should retrieve the given block `hash` via IBD. Otherwise, via unorphaning.
async fn check_orphan_resolution_range(&mut self, consensus: &ConsensusProxy, hash: Hash) -> Result<bool, ProtocolError> {
async fn check_orphan_resolution_range(
&mut self,
consensus: &ConsensusProxy,
hash: Hash,
request_id: u32,
) -> Result<bool, ProtocolError> {
self.router
.enqueue(make_message!(
.enqueue(make_request!(
Payload::RequestBlockLocator,
RequestBlockLocatorMessage { high_hash: Some(hash.into()), limit: self.ctx.orphan_resolution_range() }
RequestBlockLocatorMessage { high_hash: Some(hash.into()), limit: self.ctx.orphan_resolution_range() },
request_id
))
.await?;
let msg = dequeue_with_timeout!(self.msg_route, Payload::BlockLocator)?;
Expand Down
6 changes: 3 additions & 3 deletions protocol/flows/src/v5/blockrelay/handle_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{flow_context::FlowContext, flow_trait::Flow};
use kaspa_core::debug;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue_with_request_id, make_message, make_response,
pb::{kaspad_message::Payload, InvRelayBlockMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -36,14 +36,14 @@ impl HandleRelayBlockRequests {
// Note: in go-kaspad this was done via a dedicated one-time flow.
self.send_sink().await?;
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestRelayBlocks)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestRelayBlocks)?;
let hashes: Vec<_> = msg.try_into()?;

let session = self.ctx.consensus().unguarded_session();

for hash in hashes {
let block = session.async_get_block(hash).await?;
self.router.enqueue(make_message!(Payload::Block, (&block).into())).await?;
self.router.enqueue(make_response!(Payload::Block, (&block).into(), request_id)).await?;
debug!("relayed block with hash {} to peer {}", hash, self.router);
}
}
Expand Down
32 changes: 17 additions & 15 deletions protocol/flows/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@ use self::{
};
use crate::{flow_context::FlowContext, flow_trait::Flow};

use kaspa_p2p_lib::{KaspadMessagePayloadType, Router};
use kaspa_p2p_lib::{KaspadMessagePayloadType, Router, SharedIncomingRoute};
use std::sync::Arc;

mod address;
mod blockrelay;
mod ibd;
mod ping;
mod request_anticone;
mod request_block_locator;
mod request_headers;
mod request_ibd_blocks;
mod request_ibd_chain_block_locator;
mod request_pp_proof;
mod request_pruning_point_and_anticone;
mod request_pruning_point_utxo_set;
mod txrelay;
pub(crate) mod address;
pub(crate) mod blockrelay;
pub(crate) mod ibd;
pub(crate) mod ping;
pub(crate) mod request_anticone;
pub(crate) mod request_block_locator;
pub(crate) mod request_headers;
pub(crate) mod request_ibd_blocks;
pub(crate) mod request_ibd_chain_block_locator;
pub(crate) mod request_pp_proof;
pub(crate) mod request_pruning_point_and_anticone;
pub(crate) mod request_pruning_point_utxo_set;
pub(crate) mod txrelay;

pub fn register(ctx: FlowContext, router: Arc<Router>) -> Vec<Box<dyn Flow>> {
// IBD flow <-> invs flow channel requires no buffering hence the minimal size possible
Expand Down Expand Up @@ -60,7 +60,9 @@ pub fn register(ctx: FlowContext, router: Arc<Router>) -> Vec<Box<dyn Flow>> {
Box::new(HandleRelayInvsFlow::new(
ctx.clone(),
router.clone(),
router.subscribe_with_capacity(vec![KaspadMessagePayloadType::InvRelayBlock], ctx.block_invs_channel_size()),
SharedIncomingRoute::new(
router.subscribe_with_capacity(vec![KaspadMessagePayloadType::InvRelayBlock], ctx.block_invs_channel_size()),
),
router.subscribe(vec![KaspadMessagePayloadType::Block, KaspadMessagePayloadType::BlockLocator]),
ibd_sender,
)),
Expand Down
11 changes: 6 additions & 5 deletions protocol/flows/src/v5/request_anticone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use kaspa_core::debug;
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue_with_request_id, make_response,
pb::{kaspad_message::Payload, BlockHeadersMessage, DoneHeadersMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -34,7 +34,7 @@ impl HandleAnticoneRequests {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestAnticone)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestAnticone)?;
let (block, context): (Hash, Hash) = msg.try_into()?;

debug!("received anticone request with block hash: {}, context hash: {} for peer {}", block, context, self.router);
Expand All @@ -55,12 +55,13 @@ impl HandleAnticoneRequests {
headers.sort_by(|a, b| a.blue_work.cmp(&b.blue_work));

self.router
.enqueue(make_message!(
.enqueue(make_response!(
Payload::BlockHeaders,
BlockHeadersMessage { block_headers: headers.into_iter().map(|header| header.as_ref().into()).collect() }
BlockHeadersMessage { block_headers: headers.into_iter().map(|header| header.as_ref().into()).collect() },
request_id
))
.await?;
self.router.enqueue(make_message!(Payload::DoneHeaders, DoneHeadersMessage {})).await?;
self.router.enqueue(make_response!(Payload::DoneHeaders, DoneHeadersMessage {}, request_id)).await?;
}
}
}
9 changes: 5 additions & 4 deletions protocol/flows/src/v5/request_block_locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue_with_request_id, make_response,
pb::{kaspad_message::Payload, BlockLocatorMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -33,16 +33,17 @@ impl RequestBlockLocatorFlow {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestBlockLocator)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestBlockLocator)?;
let (high, limit) = msg.try_into()?;

let locator =
self.ctx.consensus().session().await.async_create_block_locator_from_pruning_point(high, limit as usize).await?;

self.router
.enqueue(make_message!(
.enqueue(make_response!(
Payload::BlockLocator,
BlockLocatorMessage { hashes: locator.into_iter().map(|hash| hash.into()).collect() }
BlockLocatorMessage { hashes: locator.into_iter().map(|hash| hash.into()).collect() },
request_id
))
.await?;
}
Expand Down
8 changes: 4 additions & 4 deletions protocol/flows/src/v5/request_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use kaspa_consensus_core::api::ConsensusApi;
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue, dequeue_with_request_id, make_response,
pb::{self, kaspad_message::Payload, BlockHeadersMessage, DoneHeadersMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -40,7 +40,7 @@ impl RequestHeadersFlow {
let max_blocks = max(MAX_BLOCKS, self.ctx.config.mergeset_size_limit as usize + 1);

loop {
let msg = dequeue!(self.incoming_route, Payload::RequestHeaders)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestHeaders)?;
let (high, mut low) = msg.try_into()?;

let consensus = self.ctx.consensus();
Expand Down Expand Up @@ -68,13 +68,13 @@ impl RequestHeadersFlow {
session.spawn_blocking(move |c| Self::get_headers_between(c, low, high, max_blocks)).await?;
debug!("Got {} header hashes above {}", block_headers.len(), low);
low = last;
self.router.enqueue(make_message!(Payload::BlockHeaders, BlockHeadersMessage { block_headers })).await?;
self.router.enqueue(make_response!(Payload::BlockHeaders, BlockHeadersMessage { block_headers }, request_id)).await?;

dequeue!(self.incoming_route, Payload::RequestNextHeaders)?;
session = consensus.session().await;
}

self.router.enqueue(make_message!(Payload::DoneHeaders, DoneHeadersMessage {})).await?;
self.router.enqueue(make_response!(Payload::DoneHeaders, DoneHeadersMessage {}, request_id)).await?;
}
}

Expand Down
8 changes: 5 additions & 3 deletions protocol/flows/src/v5/request_ibd_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{flow_context::FlowContext, flow_trait::Flow};
use kaspa_core::debug;
use kaspa_p2p_lib::{common::ProtocolError, dequeue, make_message, pb::kaspad_message::Payload, IncomingRoute, Router};
use kaspa_p2p_lib::{
common::ProtocolError, dequeue_with_request_id, make_response, pb::kaspad_message::Payload, IncomingRoute, Router,
};
use std::sync::Arc;

pub struct HandleIbdBlockRequests {
Expand All @@ -27,15 +29,15 @@ impl HandleIbdBlockRequests {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestIbdBlocks)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestIbdBlocks)?;
let hashes: Vec<_> = msg.try_into()?;

debug!("got request for {} IBD blocks", hashes.len());
let session = self.ctx.consensus().unguarded_session();

for hash in hashes {
let block = session.async_get_block(hash).await?;
self.router.enqueue(make_message!(Payload::IbdBlock, (&block).into())).await?;
self.router.enqueue(make_response!(Payload::IbdBlock, (&block).into(), request_id)).await?;
}
}
}
Expand Down
Loading

0 comments on commit 5556789

Please sign in to comment.