From 3b83b936279dad30a7953016103164c306e8dfe1 Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Mon, 22 Apr 2024 16:29:20 +0400 Subject: [PATCH] Improve archiving, peer block checks --- Cargo.lock | 11 +- Cargo.toml | 4 +- lib/Cargo.toml | 1 + lib/archive.rs | 253 +++++++++++++++++++++++++++++++++++++++------ lib/net/mod.rs | 117 +++++++++++---------- lib/net/peer.rs | 138 ++++++++++++++++++++++--- lib/node.rs | 269 ++++++++++++++++++++++++++++++++++++++++++++++-- lib/state.rs | 16 +-- 8 files changed, 683 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b89eb01..83a42ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -565,7 +565,7 @@ dependencies = [ [[package]] name = "bip300301" version = "0.1.1" -source = "git+https://github.com/Ash-L2L/bip300301.git?rev=d1da609b4c77d2b53bea2c8e922891b83612a03b#d1da609b4c77d2b53bea2c8e922891b83612a03b" +source = "git+https://github.com/Ash-L2L/bip300301.git?rev=e47a263c8cbf9c520aa80f71788ee28e9f11bb62#e47a263c8cbf9c520aa80f71788ee28e9f11bb62" dependencies = [ "base64 0.21.7", "bitcoin", @@ -4210,7 +4210,7 @@ dependencies = [ [[package]] name = "thunder" -version = "0.8.0" +version = "0.8.1" dependencies = [ "anyhow", "bincode", @@ -4226,6 +4226,7 @@ dependencies = [ "futures", "heed", "hex", + "jsonrpsee", "parking_lot", "quinn", "rayon", @@ -4245,7 +4246,7 @@ dependencies = [ [[package]] name = "thunder_app" -version = "0.8.0" +version = "0.8.1" dependencies = [ "anyhow", "base64 0.21.7", @@ -4278,7 +4279,7 @@ dependencies = [ [[package]] name = "thunder_app_cli" -version = "0.8.0" +version = "0.8.1" dependencies = [ "anyhow", "bip300301", @@ -4291,7 +4292,7 @@ dependencies = [ [[package]] name = "thunder_app_rpc_api" -version = "0.8.0" +version = "0.8.1" dependencies = [ "bip300301", "jsonrpsee", diff --git a/Cargo.toml b/Cargo.toml index 1af047b..9208563 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ authors = [ "Nikita Chashchinskii " ] edition = "2021" -version = "0.8.0" +version = "0.8.1" [workspace.dependencies.bip300301] git = "https://github.com/Ash-L2L/bip300301.git" -rev = "d1da609b4c77d2b53bea2c8e922891b83612a03b" +rev = "e47a263c8cbf9c520aa80f71788ee28e9f11bb62" [workspace.dependencies.rustreexo] git = "https://github.com/Ash-L2L/rustreexo.git" diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 2356e49..a46ce0c 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -19,6 +19,7 @@ fallible-iterator = "0.3.0" futures = "0.3.30" heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.4", version = "0.12.4" } hex = { version = "0.4.3", features = ["serde"] } +jsonrpsee = { version = "0.20.0" } parking_lot = "0.12.1" quinn = "0.10.1" rayon = "1.7.0" diff --git a/lib/archive.rs b/lib/archive.rs index aff0c13..9bc54b7 100644 --- a/lib/archive.rs +++ b/lib/archive.rs @@ -1,5 +1,9 @@ use std::cmp::Ordering; +use bip300301::{ + bitcoin::{self, block::Header as BitcoinHeader, hashes::Hash}, + DepositInfo, +}; use fallible_iterator::FallibleIterator; use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; @@ -7,43 +11,77 @@ use crate::types::{Accumulator, BlockHash, Body, Header}; #[derive(Debug, thiserror::Error)] pub enum Error { + #[error("invalid mainchain block hash for deposit")] + DepositInvalidMainBlockHash, #[error("heed error")] Heed(#[from] heed::Error), - #[error("invalid previous side hash")] - InvalidPrevSideHash, #[error("invalid merkle root")] InvalidMerkleRoot, + #[error("invalid previous side hash")] + InvalidPrevSideHash, #[error("no accumulator for block {0}")] NoAccumulator(BlockHash), #[error("no block with hash {0}")] NoBlock(BlockHash), + #[error("no BMM verification result with for block {0}")] + NoBmmVerification(BlockHash), + #[error("no deposits info for block {0}")] + NoDepositsInfo(bitcoin::BlockHash), #[error("no header with hash {0}")] NoHeader(BlockHash), #[error("no height info hash {0}")] NoHeight(BlockHash), + #[error("no mainchain header with hash {0}")] + NoMainHeader(bitcoin::BlockHash), } #[derive(Clone)] pub struct Archive { accumulators: Database, SerdeBincode>, - headers: Database, SerdeBincode
>, + block_hash_to_height: Database, SerdeBincode>, + /// BMM verification status for each header. + /// A status of false indicates that verification failed. + bmm_verifications: Database, SerdeBincode>, bodies: Database, SerdeBincode>, - hash_to_height: Database, SerdeBincode>, + /// Deposits by mainchain block, sorted first-to-last in each block + deposits: Database< + SerdeBincode, + SerdeBincode>, + >, + /// Sidechain headers. All ancestors of any header should always be present. + headers: Database, SerdeBincode
>, + /// Mainchain headers. All ancestors of any header should always be present + main_headers: + Database, SerdeBincode>, + /// Total work for mainchain headers. + /// All ancestors of any block should always be present + total_work: + Database, SerdeBincode>, } impl Archive { - pub const NUM_DBS: u32 = 4; + pub const NUM_DBS: u32 = 8; pub fn new(env: &heed::Env) -> Result { let accumulators = env.create_database(Some("accumulators"))?; - let headers = env.create_database(Some("headers"))?; + let block_hash_to_height = + env.create_database(Some("hash_to_height"))?; + let bmm_verifications = + env.create_database(Some("bmm_verifications"))?; let bodies = env.create_database(Some("bodies"))?; - let hash_to_height = env.create_database(Some("hash_to_height"))?; + let deposits = env.create_database(Some("deposits"))?; + let headers = env.create_database(Some("headers"))?; + let main_headers = env.create_database(Some("main_headers"))?; + let total_work = env.create_database(Some("total_work"))?; Ok(Self { accumulators, - headers, + block_hash_to_height, + bmm_verifications, bodies, - hash_to_height, + deposits, + headers, + main_headers, + total_work, }) } @@ -69,22 +107,50 @@ impl Archive { .ok_or(Error::NoAccumulator(block_hash)) } - pub fn try_get_header( + pub fn try_get_height( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result, Error> { - let header = self.headers.get(rotxn, &block_hash)?; - Ok(header) + ) -> Result, Error> { + if block_hash == BlockHash::default() { + Ok(Some(0)) + } else { + self.block_hash_to_height + .get(rotxn, &block_hash) + .map_err(Error::from) + } } - pub fn get_header( + pub fn get_height( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result { - self.try_get_header(rotxn, block_hash)? - .ok_or(Error::NoHeader(block_hash)) + ) -> Result { + self.try_get_height(rotxn, block_hash)? + .ok_or(Error::NoHeight(block_hash)) + } + + pub fn try_get_bmm_verification( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result, Error> { + if block_hash == BlockHash::default() { + Ok(Some(true)) + } else { + self.bmm_verifications + .get(rotxn, &block_hash) + .map_err(Error::from) + } + } + + pub fn get_bmm_verification( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result { + self.try_get_bmm_verification(rotxn, block_hash)? + .ok_or(Error::NoBmmVerification(block_hash)) } pub fn try_get_body( @@ -105,27 +171,76 @@ impl Archive { .ok_or(Error::NoBlock(block_hash)) } - pub fn try_get_height( + pub fn try_get_deposits( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result>, Error> { + let deposits = self.deposits.get(rotxn, &block_hash)?; + Ok(deposits) + } + + pub fn get_deposits( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result, Error> { + self.try_get_deposits(rotxn, block_hash)? + .ok_or(Error::NoDepositsInfo(block_hash)) + } + + pub fn try_get_header( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result, Error> { - if block_hash == BlockHash::default() { - Ok(Some(0)) - } else { - self.hash_to_height - .get(rotxn, &block_hash) - .map_err(Error::from) - } + ) -> Result, Error> { + let header = self.headers.get(rotxn, &block_hash)?; + Ok(header) } - pub fn get_height( + pub fn get_header( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result { - self.try_get_height(rotxn, block_hash)? - .ok_or(Error::NoHeight(block_hash)) + ) -> Result { + self.try_get_header(rotxn, block_hash)? + .ok_or(Error::NoHeader(block_hash)) + } + + pub fn try_get_main_header( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result, Error> { + let header = self.main_headers.get(rotxn, &block_hash)?; + Ok(header) + } + + fn get_main_header( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result { + self.try_get_main_header(rotxn, block_hash)? + .ok_or(Error::NoMainHeader(block_hash)) + } + + pub fn try_get_total_work( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result, Error> { + let total_work = self.total_work.get(rotxn, &block_hash)?; + Ok(total_work) + } + + pub fn get_total_work( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result { + self.try_get_total_work(rotxn, block_hash)? + .ok_or(Error::NoMainHeader(block_hash)) } /// Store a block body. The header must already exist. @@ -139,6 +254,18 @@ impl Archive { Ok(()) } + /// Store a BMM verification result + pub fn put_bmm_verification( + &self, + rwtxn: &mut RwTxn, + block_hash: BlockHash, + verification_result: bool, + ) -> Result<(), Error> { + self.bmm_verifications + .put(rwtxn, &block_hash, &verification_result)?; + Ok(()) + } + /// Store a block body. The header must already exist. pub fn put_body( &self, @@ -154,6 +281,24 @@ impl Archive { Ok(()) } + /// Store deposit info for a block + pub fn put_deposits( + &self, + rwtxn: &mut RwTxn, + block_hash: bitcoin::BlockHash, + mut deposits: Vec, + ) -> Result<(), Error> { + deposits.sort_by_key(|deposit| deposit.tx_index); + if !deposits + .iter() + .all(|deposit| deposit.block_hash == block_hash) + { + return Err(Error::DepositInvalidMainBlockHash); + }; + self.deposits.put(rwtxn, &block_hash, &deposits)?; + Ok(()) + } + pub fn put_header( &self, rwtxn: &mut RwTxn, @@ -166,8 +311,34 @@ impl Archive { }; let height = prev_height + 1; let block_hash = header.hash(); + self.block_hash_to_height.put(rwtxn, &block_hash, &height)?; self.headers.put(rwtxn, &block_hash, header)?; - self.hash_to_height.put(rwtxn, &block_hash, &height)?; + Ok(()) + } + + pub fn put_main_header( + &self, + rwtxn: &mut RwTxn, + header: &BitcoinHeader, + ) -> Result<(), Error> { + if self + .try_get_main_header(rwtxn, header.prev_blockhash)? + .is_none() + && header.prev_blockhash != bitcoin::BlockHash::all_zeros() + { + return Err(Error::NoMainHeader(header.prev_blockhash)); + } + let block_hash = header.block_hash(); + let total_work = + if header.prev_blockhash != bitcoin::BlockHash::all_zeros() { + let prev_work = + self.get_total_work(rwtxn, header.prev_blockhash)?; + prev_work + header.work() + } else { + header.work() + }; + self.main_headers.put(rwtxn, &block_hash, header)?; + self.total_work.put(rwtxn, &block_hash, &total_work)?; Ok(()) } @@ -185,6 +356,26 @@ impl Archive { } } + /// Return a fallible iterator over ancestors of a mainchain block, + /// starting with the specified block's header + pub fn main_ancestors<'a>( + &'a self, + rotxn: &'a RoTxn, + mut block_hash: bitcoin::BlockHash, + ) -> impl FallibleIterator + 'a + { + fallible_iterator::from_fn(move || { + if block_hash == bitcoin::BlockHash::all_zeros() { + Ok(None) + } else { + let res = Some(block_hash); + let header = self.get_main_header(rotxn, block_hash)?; + block_hash = header.prev_blockhash; + Ok(res) + } + }) + } + /// Find the last common ancestor of two blocks, if headers for both exist pub fn last_common_ancestor( &self, diff --git a/lib/net/mod.rs b/lib/net/mod.rs index 68b74c2..10580f1 100644 --- a/lib/net/mod.rs +++ b/lib/net/mod.rs @@ -4,6 +4,7 @@ use std::{ sync::Arc, }; +use fallible_iterator::{FallibleIterator, IteratorExt}; use futures::{channel::mpsc, StreamExt}; use heed::{ types::{OwnedType, SerdeBincode}, @@ -100,29 +101,6 @@ pub struct Net { impl Net { pub const NUM_DBS: u32 = 1; - pub fn new( - env: &heed::Env, - archive: Archive, - state: State, - bind_addr: SocketAddr, - ) -> Result<(Self, PeerInfoRx), Error> { - let (server, _) = make_server_endpoint(bind_addr)?; - let client = make_client_endpoint("0.0.0.0:0".parse()?)?; - let active_peers = Arc::new(RwLock::new(HashMap::new())); - let known_peers = env.create_database(Some("known_peers"))?; - let (peer_info_tx, peer_info_rx) = mpsc::unbounded(); - let net = Net { - server, - client, - archive, - state, - active_peers, - peer_info_tx, - known_peers, - }; - Ok((net, peer_info_rx)) - } - fn add_active_peer( &self, addr: SocketAddr, @@ -146,25 +124,14 @@ impl Net { } } - /// Accept the next incoming connection - pub async fn accept_incoming(&self, env: heed::Env) -> Result<(), Error> { - let connection = match self.server.accept().await { - Some(conn) => Connection(conn.await?), - None => return Err(Error::ServerEndpointClosed), - }; - let addr = connection.addr(); + pub fn connect_peer( + &self, + env: heed::Env, + addr: SocketAddr, + ) -> Result<(), Error> { if self.active_peers.read().contains_key(&addr) { - tracing::info!( - "already connected to {addr}, refusing duplicate connection", - ); - connection - .0 - .close(quinn::VarInt::from_u32(1), b"already connected"); - } - if connection.0.close_reason().is_some() { - return Ok(()); + return Err(Error::AlreadyConnected(addr)); } - tracing::info!("connected to peer at {addr}"); let mut rwtxn = env.write_txn()?; self.known_peers.put(&mut rwtxn, &addr, &())?; rwtxn.commit()?; @@ -174,7 +141,7 @@ impl Net { state: self.state.clone(), }; let (connection_handle, info_rx) = - peer::handle(connection_ctxt, connection); + peer::connect(self.client.clone(), addr, connection_ctxt); tokio::spawn({ let info_rx = StreamNotifyClose::new(info_rx) .map(move |info| Ok((addr, info))); @@ -189,23 +156,61 @@ impl Net { Ok(()) } - /* - /// Handle an incoming connection - pub fn handle_connection( - &self, - env: heed::Env, - connection: - ) - */ + pub fn new( + env: &heed::Env, + archive: Archive, + state: State, + bind_addr: SocketAddr, + ) -> Result<(Self, PeerInfoRx), Error> { + let (server, _) = make_server_endpoint(bind_addr)?; + let client = make_client_endpoint("0.0.0.0:0".parse()?)?; + let active_peers = Arc::new(RwLock::new(HashMap::new())); + let known_peers = env.create_database(Some("known_peers"))?; + let (peer_info_tx, peer_info_rx) = mpsc::unbounded(); + let net = Net { + server, + client, + archive, + state, + active_peers, + peer_info_tx, + known_peers, + }; + #[allow(clippy::let_and_return)] + let known_peers: Vec<_> = { + let rotxn = env.read_txn()?; + let known_peers = net + .known_peers + .iter(&rotxn)? + .transpose_into_fallible() + .collect()?; + known_peers + }; + let () = known_peers.into_iter().try_for_each(|(peer_addr, _)| { + net.connect_peer(env.clone(), peer_addr) + })?; + Ok((net, peer_info_rx)) + } - pub fn connect_peer( - &self, - env: heed::Env, - addr: SocketAddr, - ) -> Result<(), Error> { + /// Accept the next incoming connection + pub async fn accept_incoming(&self, env: heed::Env) -> Result<(), Error> { + let connection = match self.server.accept().await { + Some(conn) => Connection(conn.await?), + None => return Err(Error::ServerEndpointClosed), + }; + let addr = connection.addr(); if self.active_peers.read().contains_key(&addr) { - return Err(Error::AlreadyConnected(addr)); + tracing::info!( + "already connected to {addr}, refusing duplicate connection", + ); + connection + .0 + .close(quinn::VarInt::from_u32(1), b"already connected"); } + if connection.0.close_reason().is_some() { + return Ok(()); + } + tracing::info!("connected to peer at {addr}"); let mut rwtxn = env.write_txn()?; self.known_peers.put(&mut rwtxn, &addr, &())?; rwtxn.commit()?; @@ -215,7 +220,7 @@ impl Net { state: self.state.clone(), }; let (connection_handle, info_rx) = - peer::connect(self.client.clone(), addr, connection_ctxt); + peer::handle(connection_ctxt, connection); tokio::spawn({ let info_rx = StreamNotifyClose::new(info_rx) .map(move |info| Ok((addr, info))); diff --git a/lib/net/peer.rs b/lib/net/peer.rs index 5939a17..f8a0c62 100644 --- a/lib/net/peer.rs +++ b/lib/net/peer.rs @@ -1,9 +1,11 @@ use std::{collections::HashSet, net::SocketAddr}; +use bip300301::bitcoin::{self, hashes::Hash}; use fallible_iterator::FallibleIterator; use futures::{channel::mpsc, stream, StreamExt, TryFutureExt, TryStreamExt}; use quinn::{Endpoint, SendStream}; use serde::{Deserialize, Serialize}; +use thiserror::Error; use tokio::{ spawn, task::{JoinHandle, JoinSet}, @@ -17,7 +19,19 @@ use crate::{ types::{AuthorizedTransaction, BlockHash, Body, Header, Txid}, }; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Error)] +pub enum BanReason { + #[error("BMM verification failed for block {0}")] + BmmVerificationFailed(BlockHash), + #[error("Incorrect total work for block {block_hash}: {total_work:?}")] + IncorrectTotalWork { + block_hash: BlockHash, + total_work: Option, + }, +} + +#[must_use] +#[derive(Debug, Error)] pub enum ConnectionError { #[error("archive error")] Archive(#[from] archive::Error), @@ -31,6 +45,8 @@ pub enum ConnectionError { HeartbeatTimeout, #[error("heed error")] Heed(#[from] heed::Error), + #[error("peer should be banned; {0}")] + PeerBan(#[from] BanReason), #[error("read to end error")] ReadToEnd(#[from] quinn::ReadToEndError), #[error("send datagram error")] @@ -61,6 +77,7 @@ impl From> for ConnectionError { pub struct PeerState { block_height: u32, tip: BlockHash, + total_work: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -118,7 +135,11 @@ impl Request { #[derive(Debug)] pub enum Info { Error(ConnectionError), - /// New tip ready (body and header exist in archive) + /// Need BMM verification for the specified blocks + NeedBmmVerification(Vec), + /// Need Mainchain ancestors for the specified block hash + NeedMainchainAncestors(bitcoin::BlockHash), + /// New tip ready (body and header exist in archive, BMM verified) NewTipReady(BlockHash), NewTransaction(AuthorizedTransaction), Response(Response, Request), @@ -243,6 +264,7 @@ impl ConnectionTask { /// * If the header does not exist, request it /// * Verify height of the new tip. /// * If the previous mainchain header does not exist, request it + /// * Verify PoW /// * Verify BMM /// * If ancestor bodies do not exist, request them /// * Attempt to apply the new tip @@ -252,19 +274,34 @@ impl ConnectionTask { forward_request_tx: &mpsc::UnboundedSender, peer_state: &PeerState, ) -> Result<(), ConnectionError> { - let (tip, tip_height) = { + let (tip, tip_height, total_work) = { let rotxn = ctxt.env.read_txn()?; let tip = ctxt.state.get_tip(&rotxn)?; let tip_height = ctxt.state.get_height(&rotxn)?; - (tip, tip_height) + let total_work = match ctxt.archive.try_get_header(&rotxn, tip)? { + None => None, + Some(header) + if header.prev_main_hash + == bitcoin::BlockHash::all_zeros() => + { + None + } + Some(header) => Some( + ctxt.archive + .get_total_work(&rotxn, header.prev_main_hash)?, + ), + }; + (tip, tip_height, total_work) }; let peer_height = peer_state.block_height; - if peer_height > tip_height { + if peer_height > tip_height + || (peer_height == tip_height && peer_state.total_work > total_work) + { let header = { - let rotxn = ctxt.env.read_txn().unwrap(); + let rotxn = ctxt.env.read_txn()?; ctxt.archive.try_get_header(&rotxn, peer_state.tip)? }; - let Some(_header) = header else { + let Some(header) = header else { // Request headers let request = Request::GetHeaders { // TODO: provide alternative start points @@ -275,15 +312,68 @@ impl ConnectionTask { forward_request_tx.unbounded_send(request)?; return Ok(()); }; - // Verify height of new tip - // TODO: Check mainchain headers - let missing_bodies: Vec<_> = { + // Check mainchain headers + let prev_main_header = { + let rotxn = ctxt.env.read_txn()?; + ctxt.archive + .try_get_main_header(&rotxn, header.prev_main_hash)? + }; + let Some(_prev_main_header) = prev_main_header else { + let info = Info::NeedMainchainAncestors(header.prev_main_hash); + info_tx.unbounded_send(info)?; + return Ok(()); + }; + // Check PoW + let prev_main_total_work = { let rotxn = ctxt.env.read_txn()?; - let last_common_ancestor = ctxt.archive.last_common_ancestor( + ctxt.archive.get_total_work(&rotxn, header.prev_main_hash)? + }; + if Some(prev_main_total_work) != peer_state.total_work { + let ban_reason = BanReason::IncorrectTotalWork { + block_hash: peer_state.tip, + total_work: peer_state.total_work, + }; + return Err(ConnectionError::PeerBan(ban_reason)); + } + let last_common_ancestor = { + let rotxn = ctxt.env.read_txn()?; + ctxt.archive.last_common_ancestor( &rotxn, tip, peer_state.tip, - )?; + )? + }; + // Verify BMM + { + let rotxn = ctxt.env.read_txn()?; + let mut missing_bmm = Vec::new(); + let mut ancestors = + ctxt.archive.ancestors(&rotxn, peer_state.tip).take_while( + |block_hash| Ok(*block_hash != last_common_ancestor), + ); + while let Some(block_hash) = ancestors.next()? { + match ctxt + .archive + .try_get_bmm_verification(&rotxn, peer_state.tip)? + { + Some(false) => { + let ban_reason = + BanReason::BmmVerificationFailed(block_hash); + return Err(ConnectionError::PeerBan(ban_reason)); + } + Some(true) => (), + None => missing_bmm.push(block_hash), + } + } + if !missing_bmm.is_empty() { + missing_bmm.reverse(); + let info = Info::NeedBmmVerification(missing_bmm); + info_tx.unbounded_send(info)?; + return Ok(()); + } + }; + let missing_bodies: Vec<_> = { + let rotxn = ctxt.env.read_txn()?; ctxt.archive .ancestors(&rotxn, peer_state.tip) .take_while(|block_hash| { @@ -473,15 +563,35 @@ impl ConnectionTask { }); } MailboxItem::Heartbeat => { - let (tip, tip_height) = { + let (tip, tip_height, total_work) = { let rotxn = self.ctxt.env.read_txn()?; let tip = self.ctxt.state.get_tip(&rotxn)?; let tip_height = self.ctxt.state.get_height(&rotxn)?; - (tip, tip_height) + let total_work = match self + .ctxt + .archive + .try_get_header(&rotxn, tip)? + { + None => None, + Some(header) + if header.prev_main_hash + == bitcoin::BlockHash::all_zeros() => + { + None + } + Some(header) => { + Some(self.ctxt.archive.get_total_work( + &rotxn, + header.prev_main_hash, + )?) + } + }; + (tip, tip_height, total_work) }; let heartbeat_msg = Request::Heartbeat(PeerState { block_height: tip_height, tip, + total_work, }); task_set.spawn({ let connection = self.connection.clone(); diff --git a/lib/node.rs b/lib/node.rs index daddcba..846afc0 100644 --- a/lib/node.rs +++ b/lib/node.rs @@ -6,7 +6,14 @@ use std::{ sync::Arc, }; -use bip300301::bitcoin; +use bip300301::{ + bitcoin::{ + self, + block::{self, Header as BitcoinHeader}, + hashes::Hash, + }, + DepositInfo, +}; use fallible_iterator::{FallibleIterator, IteratorExt}; use futures::{stream, StreamExt, TryFutureExt}; use heed::RwTxn; @@ -21,7 +28,11 @@ use crate::{ self, Net, PeerConnectionInfo, PeerInfoRx, PeerRequest, PeerResponse, }, state::{self, State}, - types::*, + types::{ + Accumulator, Address, AuthorizedTransaction, BlockHash, Body, GetValue, + Header, OutPoint, Output, SpentOutput, Transaction, Txid, + WithdrawalBundle, + }, }; pub const THIS_SIDECHAIN: u8 = 9; @@ -52,6 +63,139 @@ pub enum Error { Utreexo(String), } +/// Attempt to verify bmm for the provided header, +/// and store the verification result +async fn verify_bmm( + env: &heed::Env, + archive: &Archive, + drivechain: &bip300301::Drivechain, + header: Header, +) -> Result { + use jsonrpsee::types::error::ErrorCode as JsonrpseeErrorCode; + const VERIFY_BMM_POLL_INTERVAL: Duration = Duration::from_secs(15); + let block_hash = header.hash(); + let res = { + let rotxn = env.read_txn()?; + archive.try_get_bmm_verification(&rotxn, block_hash)? + }; + if let Some(res) = res { + return Ok(res); + } + let res = match drivechain + .verify_bmm( + &header.prev_main_hash, + &block_hash.into(), + VERIFY_BMM_POLL_INTERVAL, + ) + .await + { + Ok(()) => true, + Err(bip300301::Error::Jsonrpsee(jsonrpsee::core::Error::Call(err))) + if JsonrpseeErrorCode::from(err.code()) + == JsonrpseeErrorCode::ServerError(-1) => + { + false + } + Err(err) => return Err(Error::from(err)), + }; + let mut rwtxn = env.write_txn()?; + let () = archive.put_bmm_verification(&mut rwtxn, block_hash, res)?; + rwtxn.commit()?; + Ok(res) +} + +/// Request ancestor headers from the mainchain node, +/// including the specified header +async fn request_ancestor_headers( + env: &heed::Env, + archive: &Archive, + drivechain: &bip300301::Drivechain, + mut block_hash: bitcoin::BlockHash, +) -> Result<(), Error> { + let mut headers: Vec = Vec::new(); + loop { + if block_hash == bitcoin::BlockHash::all_zeros() { + break; + } else { + let rotxn = env.read_txn()?; + if archive.try_get_main_header(&rotxn, block_hash)?.is_some() { + break; + } + } + let header = drivechain.get_header(block_hash).await?; + block_hash = header.prev_blockhash; + headers.push(header); + } + if headers.is_empty() { + Ok(()) + } else { + let mut rwtxn = env.write_txn()?; + headers.into_iter().rev().try_for_each(|header| { + archive.put_main_header(&mut rwtxn, &header) + })?; + rwtxn.commit()?; + Ok(()) + } +} + +/// Request any missing two way peg data up to the specified block hash. +/// All ancestor headers must exist in the archive. +// TODO: deposits only for now +#[allow(dead_code)] +async fn request_two_way_peg_data( + env: &heed::Env, + archive: &Archive, + drivechain: &bip300301::Drivechain, + block_hash: bitcoin::BlockHash, +) -> Result<(), Error> { + // last block for which deposit info is known + let last_known_deposit_info = { + let rotxn = env.read_txn()?; + #[allow(clippy::let_and_return)] + let last_known_deposit_info = archive + .main_ancestors(&rotxn, block_hash) + .find(|block_hash| { + let deposits = archive.try_get_deposits(&rotxn, *block_hash)?; + Ok(deposits.is_some()) + })?; + last_known_deposit_info + }; + if last_known_deposit_info == Some(block_hash) { + return Ok(()); + } + let two_way_peg_data = drivechain + .get_two_way_peg_data(block_hash, last_known_deposit_info) + .await?; + let mut rwtxn = env.write_txn()?; + // Deposits by block, first-to-last within each block + let deposits_by_block: HashMap> = { + let mut deposits = HashMap::<_, Vec<_>>::new(); + two_way_peg_data.deposits.into_iter().for_each(|deposit| { + deposits + .entry(deposit.block_hash) + .or_default() + .push(deposit) + }); + let () = archive + .main_ancestors(&rwtxn, block_hash) + .take_while(|block_hash| { + Ok(last_known_deposit_info != Some(*block_hash)) + }) + .for_each(|block_hash| { + let _ = deposits.entry(block_hash).or_default(); + Ok(()) + })?; + deposits + }; + deposits_by_block + .into_iter() + .try_for_each(|(block_hash, deposits)| { + archive.put_deposits(&mut rwtxn, block_hash, deposits) + })?; + rwtxn.commit()?; + Ok(()) +} + async fn connect_tip_( rwtxn: &mut RwTxn<'_, '_>, archive: &Archive, @@ -146,6 +290,12 @@ async fn submit_block( body: &Body, ) -> Result<(), Error> { let mut rwtxn = env.write_txn()?; + if archive + .try_get_main_header(&rwtxn, header.prev_main_hash)? + .is_none() + { + // Request mainchain headers + } let () = connect_tip_( &mut rwtxn, archive, drivechain, mempool, state, header, body, ) @@ -207,6 +357,7 @@ async fn reorg_to_tip( Ok(()) } +#[derive(Clone)] struct NetTaskContext { env: heed::Env, archive: Archive, @@ -248,7 +399,7 @@ impl NetTask { let () = ctxt.net.remove_active_peer(addr); return Ok(()); } - // verify bmm + // Verify BMM // TODO: Spawn a task for this let () = ctxt .drivechain @@ -295,17 +446,16 @@ impl NetTask { PeerResponse::Headers(headers), ) => { // check that the end header is as requested - if let Some(end_header) = headers.last() { - if end_header.hash() != end { - tracing::warn!(%addr, ?req, ?end_header,"Invalid response from peer; unexpected end header"); - let () = ctxt.net.remove_active_peer(addr); - return Ok(()); - } - } else { + let Some(end_header) = headers.last() else { tracing::warn!(%addr, ?req, "Invalid response from peer; missing end header"); let () = ctxt.net.remove_active_peer(addr); return Ok(()); }; + if end_header.hash() != end { + tracing::warn!(%addr, ?req, ?end_header,"Invalid response from peer; unexpected end header"); + let () = ctxt.net.remove_active_peer(addr); + return Ok(()); + } // Must be at least one header due to previous check let start_hash = headers.first().unwrap().prev_side_hash; // check that the first header is after a start block @@ -337,6 +487,57 @@ impl NetTask { } prev_side_hash = header.hash(); } + // Request mainchain headers + tokio::spawn({ + let ctxt = ctxt.clone(); + let prev_main_hash = headers.last().unwrap().prev_main_hash; + async move { + if let Err(err) = request_ancestor_headers( + &ctxt.env, + &ctxt.archive, + &ctxt.drivechain, + prev_main_hash, + ) + .await + { + let err = anyhow::anyhow!(err); + tracing::error!(%addr, err = format!("{err:#}"), "Request ancestor headers error"); + } + } + }); + // Verify BMM + tokio::spawn({ + let ctxt = ctxt.clone(); + let headers = headers.clone(); + async move { + for header in headers.clone() { + match verify_bmm( + &ctxt.env, + &ctxt.archive, + &ctxt.drivechain, + header.clone(), + ) + .await + { + Ok(true) => (), + Ok(false) => { + tracing::warn!( + %addr, + ?header, + ?headers, + "Invalid response from peer; BMM verification failed" + ); + let () = ctxt.net.remove_active_peer(addr); + break; + } + Err(err) => { + let err = anyhow::anyhow!(err); + tracing::error!(%addr, err = format!("{err:#}"), "Verify BMM error"); + } + } + } + } + }); // Store new headers let mut rwtxn = ctxt.env.write_txn()?; for header in headers { @@ -430,6 +631,54 @@ impl NetTask { tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); let () = self.ctxt.net.remove_active_peer(addr); } + PeerConnectionInfo::NeedBmmVerification( + block_hashes, + ) => { + let headers: Vec<_> = { + let rotxn = self.ctxt.env.read_txn()?; + block_hashes + .into_iter() + .map(|block_hash| { + self.ctxt + .archive + .get_header(&rotxn, block_hash) + }) + .transpose_into_fallible() + .collect()? + }; + tokio::spawn({ + let ctxt = self.ctxt.clone(); + async move { + for header in headers { + if let Err(err) = verify_bmm( + &ctxt.env, + &ctxt.archive, + &ctxt.drivechain, + header, + ) + .await + { + let err = anyhow::anyhow!(err); + tracing::error!(%addr, err = format!("{err:#}"), "Verify BMM error") + } + } + } + }); + } + PeerConnectionInfo::NeedMainchainAncestors( + block_hash, + ) => { + tokio::spawn({ + let ctxt = self.ctxt.clone(); + async move { + let () = request_ancestor_headers(&ctxt.env, &ctxt.archive, &ctxt.drivechain, block_hash) + .unwrap_or_else(move |err| { + let err = anyhow::anyhow!(err); + tracing::error!(%addr, err = format!("{err:#}"), "Request ancestor headers error"); + }).await; + } + }); + } PeerConnectionInfo::NewTipReady(new_tip) => { let () = reorg_to_tip( &self.ctxt.env, diff --git a/lib/state.rs b/lib/state.rs index 69af0d8..3f9fc83 100644 --- a/lib/state.rs +++ b/lib/state.rs @@ -658,12 +658,12 @@ impl State { &(deposit_block_hash, block_height - 1), )?; } - for (outpoint, deposit) in &two_way_peg_data.deposits { - if let Ok(address) = deposit.address.parse() { - let outpoint = OutPoint::Deposit(*outpoint); + for deposit in &two_way_peg_data.deposits { + if let Ok(address) = deposit.output.address.parse() { + let outpoint = OutPoint::Deposit(deposit.outpoint); let output = Output { address, - content: OutputContent::Value(deposit.value), + content: OutputContent::Value(deposit.output.value), }; self.utxos.put(rwtxn, &outpoint, &output)?; let utxo_hash = hash(&PointedOutput { outpoint, output }); @@ -839,12 +839,12 @@ impl State { return Err(Error::NoDepositBlock); }; } - for (outpoint, deposit) in two_way_peg_data.deposits.iter().rev() { - if let Ok(address) = deposit.address.parse() { - let outpoint = OutPoint::Deposit(*outpoint); + for deposit in two_way_peg_data.deposits.iter().rev() { + if let Ok(address) = deposit.output.address.parse() { + let outpoint = OutPoint::Deposit(deposit.outpoint); let output = Output { address, - content: OutputContent::Value(deposit.value), + content: OutputContent::Value(deposit.output.value), }; if !self.utxos.delete(rwtxn, &outpoint)? { return Err(Error::NoUtxo { outpoint });