From efad71b553f2683e900aa25bcc1a3fa1d7caebef Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Mon, 29 Apr 2024 16:34:34 +0800 Subject: [PATCH] Fix networking, serialization, reorg issues, tag for release --- Cargo.lock | 5 +- Cargo.toml | 3 + app/Cargo.toml | 2 +- app/app.rs | 3 +- lib/Cargo.toml | 2 +- lib/archive.rs | 69 +++++++++++++++----- lib/mempool.rs | 10 ++- lib/miner.rs | 7 ++- lib/net/mod.rs | 2 +- lib/net/peer.rs | 161 +++++++++++++++++++++++++++++------------------ lib/node.rs | 103 +++++++++++++++++++++++------- lib/state.rs | 64 +++++-------------- lib/types/mod.rs | 35 ++++++++++- lib/wallet.rs | 14 +++-- 14 files changed, 310 insertions(+), 170 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ede0b5..b89eb01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3642,9 +3642,8 @@ dependencies = [ [[package]] name = "rustreexo" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4852cab7dec3e1d89bb3ce30ca3461f7d6baee2143d0460c07bf1a718af68997" +version = "0.2.0" +source = "git+https://github.com/Ash-L2L/rustreexo.git?rev=a3ac7d3ebe9749ebd0bb34c709e7616f83d573b3#a3ac7d3ebe9749ebd0bb34c709e7616f83d573b3" dependencies = [ "bitcoin_hashes 0.12.0", "serde", diff --git a/Cargo.toml b/Cargo.toml index d667b03..1af047b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,9 @@ version = "0.8.0" git = "https://github.com/Ash-L2L/bip300301.git" rev = "d1da609b4c77d2b53bea2c8e922891b83612a03b" +[workspace.dependencies.rustreexo] +git = "https://github.com/Ash-L2L/rustreexo.git" +rev = "a3ac7d3ebe9749ebd0bb34c709e7616f83d573b3" [profile.release] # lto = "fat" diff --git a/app/Cargo.toml b/app/Cargo.toml index a66bdca..cd4edfb 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -23,7 +23,7 @@ eframe = "0.27.1" futures = "0.3.30" human-size = "0.4.3" jsonrpsee = { version = "0.20.0", features = ["server"] } -rustreexo = { version = "0.1.0" } +rustreexo = { workspace = true } parking_lot = "0.12.1" serde = { version = "1.0.179", features = ["derive"] } shlex = "1.3.0" diff --git a/app/app.rs b/app/app.rs index 0178224..7ac3b05 100644 --- a/app/app.rs +++ b/app/app.rs @@ -153,9 +153,10 @@ impl App { .await?; let roots = { let mut accumulator = self.node.get_accumulator()?; - body.modify_pollard(&mut accumulator) + body.modify_pollard(&mut accumulator.0) .map_err(Error::Utreexo)?; accumulator + .0 .get_roots() .iter() .map(|root| root.get_data()) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index d9dd8ed..2356e49 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -24,7 +24,7 @@ quinn = "0.10.1" rayon = "1.7.0" rcgen = "0.11.1" rustls = { version = "0.21.5", features = ["dangerous_configuration"] } -rustreexo = { version = "0.1.0", features = ["with-serde"] } +rustreexo = { workspace = true, features = ["with-serde"] } serde = { version = "1.0.179", features = ["derive"] } serde_json = "1.0.113" sha256 = "1.2.2" diff --git a/lib/archive.rs b/lib/archive.rs index 0f9d50d..aff0c13 100644 --- a/lib/archive.rs +++ b/lib/archive.rs @@ -1,12 +1,9 @@ use std::cmp::Ordering; use fallible_iterator::FallibleIterator; -use heed::{ - types::{OwnedType, SerdeBincode}, - Database, RoTxn, RwTxn, -}; +use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; -use crate::types::{BlockHash, Body, Header}; +use crate::types::{Accumulator, BlockHash, Body, Header}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -16,6 +13,8 @@ pub enum Error { InvalidPrevSideHash, #[error("invalid merkle root")] InvalidMerkleRoot, + #[error("no accumulator for block {0}")] + NoAccumulator(BlockHash), #[error("no block with hash {0}")] NoBlock(BlockHash), #[error("no header with hash {0}")] @@ -26,25 +25,50 @@ pub enum Error { #[derive(Clone)] pub struct Archive { + accumulators: Database, SerdeBincode>, headers: Database, SerdeBincode
>, bodies: Database, SerdeBincode>, - hash_to_height: Database, OwnedType>, + hash_to_height: Database, SerdeBincode>, } impl Archive { - pub const NUM_DBS: u32 = 3; + pub const NUM_DBS: u32 = 4; pub fn new(env: &heed::Env) -> Result { + let accumulators = env.create_database(Some("accumulators"))?; let headers = env.create_database(Some("headers"))?; let bodies = env.create_database(Some("bodies"))?; let hash_to_height = env.create_database(Some("hash_to_height"))?; Ok(Self { + accumulators, headers, bodies, hash_to_height, }) } + pub fn try_get_accumulator( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result, Error> { + if block_hash == BlockHash::default() { + Ok(Some(Accumulator::default())) + } else { + let accumulator = self.accumulators.get(rotxn, &block_hash)?; + Ok(accumulator) + } + } + + pub fn get_accumulator( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result { + self.try_get_accumulator(rotxn, block_hash)? + .ok_or(Error::NoAccumulator(block_hash)) + } + pub fn try_get_header( &self, rotxn: &RoTxn, @@ -104,6 +128,17 @@ impl Archive { .ok_or(Error::NoHeight(block_hash)) } + /// Store a block body. The header must already exist. + pub fn put_accumulator( + &self, + rwtxn: &mut RwTxn, + block_hash: BlockHash, + accumulator: &Accumulator, + ) -> Result<(), Error> { + self.accumulators.put(rwtxn, &block_hash, accumulator)?; + Ok(()) + } + /// Store a block body. The header must already exist. pub fn put_body( &self, @@ -159,30 +194,30 @@ impl Archive { ) -> Result { let mut height0 = self.get_height(rotxn, block_hash0)?; let mut height1 = self.get_height(rotxn, block_hash1)?; - let mut header0 = self.get_header(rotxn, block_hash0)?; - let mut header1 = self.get_header(rotxn, block_hash1)?; + let mut header0 = self.try_get_header(rotxn, block_hash0)?; + let mut header1 = self.try_get_header(rotxn, block_hash1)?; // Find respective ancestors of block_hash0 and block_hash1 with height // equal to min(height0, height1) loop { match height0.cmp(&height1) { Ordering::Less => { - block_hash1 = header1.prev_side_hash; - header1 = self.get_header(rotxn, block_hash1)?; + block_hash1 = header1.unwrap().prev_side_hash; + header1 = self.try_get_header(rotxn, block_hash1)?; height1 -= 1; } Ordering::Greater => { - block_hash0 = header0.prev_side_hash; - header0 = self.get_header(rotxn, block_hash0)?; + block_hash0 = header0.unwrap().prev_side_hash; + header0 = self.try_get_header(rotxn, block_hash0)?; height0 -= 1; } Ordering::Equal => { if block_hash0 == block_hash1 { return Ok(block_hash0); } else { - block_hash0 = header0.prev_side_hash; - block_hash1 = header1.prev_side_hash; - header0 = self.get_header(rotxn, block_hash0)?; - header1 = self.get_header(rotxn, block_hash1)?; + block_hash0 = header0.unwrap().prev_side_hash; + block_hash1 = header1.unwrap().prev_side_hash; + header0 = self.try_get_header(rotxn, block_hash0)?; + header1 = self.try_get_header(rotxn, block_hash1)?; height0 -= 1; height1 -= 1; } diff --git a/lib/mempool.rs b/lib/mempool.rs index 6c53b49..250c67c 100644 --- a/lib/mempool.rs +++ b/lib/mempool.rs @@ -1,9 +1,8 @@ use std::collections::VecDeque; use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; -use rustreexo::accumulator::pollard::Pollard; -use crate::types::{AuthorizedTransaction, OutPoint, Txid}; +use crate::types::{Accumulator, AuthorizedTransaction, OutPoint, Txid}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -104,7 +103,7 @@ impl MemPool { pub fn regenerate_proofs( &self, rwtxn: &mut RwTxn, - accumulator: &Pollard, + accumulator: &Accumulator, ) -> Result<(), Error> { let mut iter = self.transactions.iter_mut(rwtxn)?; while let Some(tx) = iter.next() { @@ -115,9 +114,8 @@ impl MemPool { .iter() .map(|(_, utxo_hash)| utxo_hash.into()) .collect(); - let (proof, _) = - accumulator.prove(&targets).map_err(Error::Utreexo)?; - tx.transaction.proof = proof; + tx.transaction.proof = + accumulator.0.prove(&targets).map_err(Error::Utreexo)?; unsafe { iter.put_current(&txid, &tx) }?; } Ok(()) diff --git a/lib/miner.rs b/lib/miner.rs index 8878d06..0c8b521 100644 --- a/lib/miner.rs +++ b/lib/miner.rs @@ -55,7 +55,7 @@ impl Miner { height: u32, header: Header, body: Body, - ) -> Result<(), Error> { + ) -> Result { let str_hash_prev = header.prev_main_hash.to_string(); let critical_hash: [u8; 32] = header.hash().into(); let critical_hash = bitcoin::BlockHash::from_byte_array(critical_hash); @@ -77,11 +77,12 @@ impl Miner { .as_str() .map(|s| s.to_owned()) .ok_or(Error::InvalidJson { json: value })?; - let _ = + let txid = bitcoin::Txid::from_str(&txid).map_err(bip300301::Error::from)?; + tracing::info!("created BMM tx: {txid}"); assert_eq!(header.merkle_root, body.compute_merkle_root()); self.block = Some((header, body)); - Ok(()) + Ok(txid) } pub async fn confirm_bmm( diff --git a/lib/net/mod.rs b/lib/net/mod.rs index f8f93a0..68b74c2 100644 --- a/lib/net/mod.rs +++ b/lib/net/mod.rs @@ -109,7 +109,7 @@ impl Net { let (server, _) = make_server_endpoint(bind_addr)?; let client = make_client_endpoint("0.0.0.0:0".parse()?)?; let active_peers = Arc::new(RwLock::new(HashMap::new())); - let known_peers = env.create_database(Some("utxos"))?; + let known_peers = env.create_database(Some("known_peers"))?; let (peer_info_tx, peer_info_rx) = mpsc::unbounded(); let net = Net { server, diff --git a/lib/net/peer.rs b/lib/net/peer.rs index e741257..5939a17 100644 --- a/lib/net/peer.rs +++ b/lib/net/peer.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{collections::HashSet, net::SocketAddr}; use fallible_iterator::FallibleIterator; use futures::{channel::mpsc, stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -89,13 +89,31 @@ pub enum Request { }, /// Request headers up to [`end`] GetHeaders { + /// Request headers AFTER (not including) the first ancestor found in + /// the specified list, if such an ancestor exists. + start: HashSet, end: BlockHash, + /// Height is only relevant for the requester, + /// so serialization is skipped + #[serde(skip)] + height: Option, }, PushTransaction { transaction: AuthorizedTransaction, }, } +impl Request { + fn expect_response(&self) -> bool { + match self { + Self::GetBlock { .. } + | Self::GetHeaders { .. } + | Self::PushTransaction { .. } => true, + Self::Heartbeat(_) => false, + } + } +} + #[must_use] #[derive(Debug)] pub enum Info { @@ -156,23 +174,22 @@ impl Connection { Ok((request, tx)) } - pub fn heart_beat(&self, state: &PeerState) -> Result<(), ConnectionError> { - let message = bincode::serialize(state)?; - self.0.send_datagram(bytes::Bytes::from(message))?; - Ok(()) - } - pub async fn request( &self, message: &Request, - ) -> Result { + ) -> Result, ConnectionError> { + let expect_response = message.expect_response(); let (mut send, mut recv) = self.0.open_bi().await?; let message = bincode::serialize(message)?; send.write_all(&message).await?; send.finish().await?; - let response = recv.read_to_end(Self::READ_LIMIT).await?; - let response: Response = bincode::deserialize(&response)?; - Ok(response) + if expect_response { + let response = recv.read_to_end(Self::READ_LIMIT).await?; + let response: Response = bincode::deserialize(&response)?; + Ok(Some(response)) + } else { + Ok(None) + } } } @@ -199,7 +216,11 @@ impl ConnectionTask { info_tx: &mpsc::UnboundedSender, request: Request, ) { - let resp = conn.request(&request).await; + let resp = match conn.request(&request).await { + Ok(Some(resp)) => Ok(resp), + Err(err) => Err(err), + Ok(None) => return, + }; let info = resp.map(|resp| Info::Response(resp, request)).into(); if info_tx.unbounded_send(info).is_err() { let addr = conn.addr(); @@ -218,6 +239,13 @@ impl ConnectionTask { .map_err(ConnectionError::from) } + /// If a new tip is announced with greater height than the current tip: + /// * If the header does not exist, request it + /// * Verify height of the new tip. + /// * If the previous mainchain header does not exist, request it + /// * Verify BMM + /// * If ancestor bodies do not exist, request them + /// * Attempt to apply the new tip async fn handle_heartbeat( ctxt: &ConnectionContext, info_tx: &mpsc::UnboundedSender, @@ -232,51 +260,52 @@ impl ConnectionTask { }; let peer_height = peer_state.block_height; if peer_height > tip_height { - let header_exists = { + let header = { let rotxn = ctxt.env.read_txn().unwrap(); - ctxt.archive - .try_get_header(&rotxn, peer_state.tip) - .unwrap() - .is_some() + ctxt.archive.try_get_header(&rotxn, peer_state.tip)? }; - if header_exists { - let missing_bodies: Vec<_> = { - let rotxn = ctxt.env.read_txn()?; - let last_common_ancestor = ctxt - .archive - .last_common_ancestor(&rotxn, tip, peer_state.tip)?; - ctxt.archive - .ancestors(&rotxn, peer_state.tip) - .take_while(|block_hash| { - Ok(*block_hash != last_common_ancestor) - }) - .filter_map(|block_hash| { - match ctxt - .archive - .try_get_body(&rotxn, block_hash)? - { - Some(_) => Ok(None), - None => Ok(Some(block_hash)), - } - }) - .collect()? - }; - if missing_bodies.is_empty() { - let info = Info::NewTipReady(peer_state.tip); - info_tx.unbounded_send(info)?; - } else { - // Request missing bodies - missing_bodies.into_iter().try_for_each(|block_hash| { - let request = Request::GetBlock { block_hash }; - forward_request_tx.unbounded_send(request) - })?; - } - } else { + let Some(_header) = header else { // Request headers let request = Request::GetHeaders { + // TODO: provide alternative start points + start: HashSet::new(), end: peer_state.tip, + height: Some(peer_state.block_height), }; forward_request_tx.unbounded_send(request)?; + return Ok(()); + }; + // Verify height of new tip + // TODO: Check mainchain headers + let missing_bodies: Vec<_> = { + let rotxn = ctxt.env.read_txn()?; + let last_common_ancestor = ctxt.archive.last_common_ancestor( + &rotxn, + tip, + peer_state.tip, + )?; + ctxt.archive + .ancestors(&rotxn, peer_state.tip) + .take_while(|block_hash| { + Ok(*block_hash != last_common_ancestor) + }) + .filter_map(|block_hash| { + match ctxt.archive.try_get_body(&rotxn, block_hash)? { + Some(_) => Ok(None), + None => Ok(Some(block_hash)), + } + }) + .collect()? + }; + if missing_bodies.is_empty() { + let info = Info::NewTipReady(peer_state.tip); + info_tx.unbounded_send(info)?; + } else { + // Request missing bodies + missing_bodies.into_iter().try_for_each(|block_hash| { + let request = Request::GetBlock { block_hash }; + forward_request_tx.unbounded_send(request) + })?; } } Ok(()) @@ -303,18 +332,17 @@ impl ConnectionTask { async fn handle_get_headers( ctxt: &ConnectionContext, response_tx: SendStream, + mut start: HashSet, end: BlockHash, ) -> Result<(), ConnectionError> { - // FIXME + start.insert(BlockHash::default()); let response = { let rotxn = ctxt.env.read_txn()?; if ctxt.archive.try_get_header(&rotxn, end)?.is_some() { let mut headers: Vec
= ctxt .archive .ancestors(&rotxn, end) - .take_while(|block_hash| { - Ok(*block_hash != BlockHash::default()) - }) + .take_while(|block_hash| Ok(!start.contains(block_hash))) .map(|block_hash| { ctxt.archive.get_header(&rotxn, block_hash) }) @@ -383,9 +411,11 @@ impl ConnectionTask { Request::GetBlock { block_hash } => { Self::handle_get_block(ctxt, response_tx, block_hash).await } - Request::GetHeaders { end } => { - Self::handle_get_headers(ctxt, response_tx, end).await - } + Request::GetHeaders { + start, + end, + height: _, + } => Self::handle_get_headers(ctxt, response_tx, start, end).await, Request::PushTransaction { transaction } => { Self::handle_push_tx(ctxt, info_tx, response_tx, transaction) .await @@ -449,11 +479,22 @@ impl ConnectionTask { let tip_height = self.ctxt.state.get_height(&rotxn)?; (tip, tip_height) }; - let state_msg = PeerState { + let heartbeat_msg = Request::Heartbeat(PeerState { block_height: tip_height, tip, - }; - let () = self.connection.heart_beat(&state_msg)?; + }); + task_set.spawn({ + let connection = self.connection.clone(); + let info_tx = self.info_tx.clone(); + async move { + Self::send_request( + &connection, + &info_tx, + heartbeat_msg, + ) + .await; + } + }); } MailboxItem::Request((request, response_tx)) => { let () = Self::handle_request( diff --git a/lib/node.rs b/lib/node.rs index a6a5a8a..daddcba 100644 --- a/lib/node.rs +++ b/lib/node.rs @@ -10,8 +10,7 @@ use bip300301::bitcoin; use fallible_iterator::{FallibleIterator, IteratorExt}; use futures::{stream, StreamExt, TryFutureExt}; use heed::RwTxn; -use rustreexo::accumulator::pollard::Pollard; -use tokio::task::JoinHandle; +use tokio::{task::JoinHandle, time::Duration}; use tokio_stream::StreamNotifyClose; use tokio_util::task::LocalPoolHandle; @@ -78,12 +77,13 @@ async fn connect_tip_( let () = state.connect_block(rwtxn, header, body)?; } let () = state.connect_two_way_peg_data(rwtxn, &two_way_peg_data)?; + let accumulator = state.get_accumulator(rwtxn)?; let () = archive.put_header(rwtxn, header)?; let () = archive.put_body(rwtxn, block_hash, body)?; + let () = archive.put_accumulator(rwtxn, block_hash, &accumulator)?; for transaction in &body.transactions { let () = mempool.delete(rwtxn, transaction.txid())?; } - let accumulator = state.get_accumulator(rwtxn)?; let () = mempool.regenerate_proofs(rwtxn, &accumulator)?; Ok(()) } @@ -105,7 +105,7 @@ async fn disconnect_tip_( .rev_iter(rwtxn)? .transpose_into_fallible() .find_map(|(_, (block_hash, applied_height))| { - if applied_height < height { + if applied_height < height - 1 { Ok(Some(block_hash)) } else { Ok(None) @@ -117,6 +117,17 @@ async fn disconnect_tip_( }; let () = state.disconnect_two_way_peg_data(rwtxn, &two_way_peg_data)?; let () = state.disconnect_tip(rwtxn, &tip_header, &tip_body)?; + // TODO: revert accumulator only necessary because rustreexo does not + // support undo yet + { + let new_tip = state.get_tip(rwtxn)?; + let accumulator = archive.get_accumulator(rwtxn, new_tip)?; + let () = state.utreexo_accumulator.put( + rwtxn, + &state::UnitKey, + &accumulator, + )?; + } for transaction in tip_body.authorized_transactions().iter().rev() { mempool.put(rwtxn, transaction)?; } @@ -165,8 +176,8 @@ async fn reorg_to_tip( let common_ancestor = archive.last_common_ancestor(&rwtxn, tip, new_tip)?; // Check that all necessary bodies exist before disconnecting tip let blocks_to_apply: Vec<(Header, Body)> = archive - .ancestors(&rwtxn, tip) - .take_while(|block_hash| Ok(*block_hash != tip)) + .ancestors(&rwtxn, new_tip) + .take_while(|block_hash| Ok(*block_hash != common_ancestor)) .map(|block_hash| { let header = archive.get_header(&rwtxn, block_hash)?; let body = archive.get_body(&rwtxn, block_hash)?; @@ -192,6 +203,7 @@ async fn reorg_to_tip( let tip = state.get_tip(&rwtxn)?; assert_eq!(tip, new_tip); rwtxn.commit()?; + tracing::info!("reorged to tip: {new_tip}"); Ok(()) } @@ -210,6 +222,8 @@ struct NetTask { } impl NetTask { + const VERIFY_BMM_POLL_INTERVAL: Duration = Duration::from_secs(15); + async fn handle_response( ctxt: &NetTaskContext, addr: SocketAddr, @@ -234,6 +248,16 @@ impl NetTask { let () = ctxt.net.remove_active_peer(addr); return Ok(()); } + // verify bmm + // TODO: Spawn a task for this + let () = ctxt + .drivechain + .verify_bmm( + &header.prev_main_hash, + &block_hash.into(), + Self::VERIFY_BMM_POLL_INTERVAL, + ) + .await?; if header.prev_side_hash == tip { submit_block( &ctxt.env, @@ -263,7 +287,11 @@ impl NetTask { }, ) if req_block_hash == resp_block_hash => Ok(()), ( - req @ PeerRequest::GetHeaders { end }, + ref req @ PeerRequest::GetHeaders { + ref start, + end, + height: Some(height), + }, PeerResponse::Headers(headers), ) => { // check that the end header is as requested @@ -273,9 +301,34 @@ impl NetTask { let () = ctxt.net.remove_active_peer(addr); return Ok(()); } + } else { + tracing::warn!(%addr, ?req, "Invalid response from peer; missing end header"); + let () = ctxt.net.remove_active_peer(addr); + return Ok(()); }; + // Must be at least one header due to previous check + let start_hash = headers.first().unwrap().prev_side_hash; + // check that the first header is after a start block + if !(start.contains(&start_hash) + || start_hash == BlockHash::default()) + { + tracing::warn!(%addr, ?req, ?start_hash, "Invalid response from peer; invalid start hash"); + let () = ctxt.net.remove_active_peer(addr); + return Ok(()); + } + // check that the end header height is as expected + { + let rotxn = ctxt.env.read_txn()?; + let start_height = + ctxt.archive.get_height(&rotxn, start_hash)?; + if start_height + headers.len() as u32 != height { + tracing::warn!(%addr, ?req, ?start_hash, "Invalid response from peer; invalid end height"); + let () = ctxt.net.remove_active_peer(addr); + return Ok(()); + } + } // check that headers are sequential based on prev_side_hash - let mut prev_side_hash = BlockHash::default(); + let mut prev_side_hash = start_hash; for header in &headers { if header.prev_side_hash != prev_side_hash { tracing::warn!(%addr, ?req, ?headers,"Invalid response from peer; non-sequential headers"); @@ -293,10 +346,11 @@ impl NetTask { .try_get_header(&rwtxn, block_hash)? .is_none() { - if ctxt - .archive - .try_get_header(&rwtxn, header.prev_side_hash)? - .is_some() + if header.prev_side_hash == BlockHash::default() + || ctxt + .archive + .try_get_header(&rwtxn, header.prev_side_hash)? + .is_some() { ctxt.archive.put_header(&mut rwtxn, &header)?; } else { @@ -308,7 +362,11 @@ impl NetTask { Ok(()) } ( - PeerRequest::GetHeaders { end }, + PeerRequest::GetHeaders { + start: _, + end, + height: _, + }, PeerResponse::NoHeader { block_hash }, ) if end == block_hash => Ok(()), ( @@ -368,7 +426,8 @@ impl NetTask { MailboxItem::PeerInfo(Some((addr, Some(peer_info)))) => { match peer_info { PeerConnectionInfo::Error(err) => { - tracing::error!(%addr, %err, "Peer connection error"); + let err = anyhow::anyhow!(err); + tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); let () = self.ctxt.net.remove_active_peer(addr); } PeerConnectionInfo::NewTipReady(new_tip) => { @@ -437,9 +496,10 @@ impl Node { let env = heed::EnvOpenOptions::new() .map_size(10 * 1024 * 1024) // 10MB .max_dbs( - crate::state::State::NUM_DBS - + crate::archive::Archive::NUM_DBS - + crate::mempool::MemPool::NUM_DBS, + State::NUM_DBS + + Archive::NUM_DBS + + MemPool::NUM_DBS + + Net::NUM_DBS, ) .open(env_path)?; let state = State::new(&env)?; @@ -463,9 +523,10 @@ impl Node { state: state.clone(), }; || { - NetTask { ctxt, peer_info_rx } - .run() - .unwrap_or_else(|err| tracing::error!(%err)) + NetTask { ctxt, peer_info_rx }.run().unwrap_or_else(|err| { + let err = anyhow::anyhow!(err); + tracing::error!(err = format!("{err:#}")) + }) } }); Ok(Self { @@ -544,7 +605,7 @@ impl Node { Ok(utxos) } - pub fn get_accumulator(&self) -> Result { + pub fn get_accumulator(&self) -> Result { let rotxn = self.env.read_txn()?; Ok(self.state.get_accumulator(&rotxn)?) } diff --git a/lib/state.rs b/lib/state.rs index 68b566c..69af0d8 100644 --- a/lib/state.rs +++ b/lib/state.rs @@ -1,9 +1,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use heed::{ - types::{OwnedType, SerdeBincode}, - Database, RoTxn, RwTxn, -}; +use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; use bip300301::{ bitcoin::{ @@ -11,18 +8,17 @@ use bip300301::{ }, TwoWayPegData, WithdrawalBundleStatus, }; -use rustreexo::accumulator::{ - node_hash::NodeHash, pollard::Pollard, proof::Proof, -}; +use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof}; use serde::{Deserialize, Serialize}; use crate::{ authorization::Authorization, types::{ - hash, Address, AggregatedWithdrawal, AuthorizedTransaction, BlockHash, - Body, FilledTransaction, GetAddress, GetValue, Header, InPoint, - MerkleRoot, OutPoint, Output, OutputContent, PointedOutput, - SpentOutput, Transaction, Txid, Verify, WithdrawalBundle, + hash, Accumulator, Address, AggregatedWithdrawal, + AuthorizedTransaction, BlockHash, Body, FilledTransaction, GetAddress, + GetValue, Header, InPoint, MerkleRoot, OutPoint, Output, OutputContent, + PointedOutput, SpentOutput, Transaction, Txid, Verify, + WithdrawalBundle, }, }; @@ -110,41 +106,12 @@ impl Serialize for UnitKey { } } -#[derive(Debug, Default)] -#[repr(transparent)] -pub struct Accumulator(pub Pollard); - -impl<'de> Deserialize<'de> for Accumulator { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let bytes: &[u8] = <&[u8] as Deserialize>::deserialize(deserializer)?; - let pollard = Pollard::deserialize(bytes) - .map_err(::custom)?; - Ok(Self(pollard)) - } -} - -impl Serialize for Accumulator { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut bytes = Vec::new(); - self.0 - .serialize(&mut bytes) - .map_err(::custom)?; - bytes.serialize(serializer) - } -} - #[derive(Clone)] pub struct State { /// Current tip tip: Database, SerdeBincode>, /// Current height - height: Database, OwnedType>, + height: Database, SerdeBincode>, pub utxos: Database, SerdeBincode>, pub stxos: Database, SerdeBincode>, /// Pending withdrawal bundle and block height @@ -152,12 +119,12 @@ pub struct State { Database, SerdeBincode<(WithdrawalBundle, u32)>>, /// Mapping from block height to withdrawal bundle and status pub withdrawal_bundles: Database< - OwnedType, + SerdeBincode, SerdeBincode<(WithdrawalBundle, WithdrawalBundleStatus)>, >, /// deposit blocks and the height at which they were applied, keyed sequentially pub deposit_blocks: - Database, SerdeBincode<(bitcoin::BlockHash, u32)>>, + Database, SerdeBincode<(bitcoin::BlockHash, u32)>>, pub utreexo_accumulator: Database, SerdeBincode>, } @@ -241,12 +208,11 @@ impl State { } /// Get the current Utreexo accumulator - pub fn get_accumulator(&self, rotxn: &RoTxn) -> Result { + pub fn get_accumulator(&self, rotxn: &RoTxn) -> Result { let accumulator = self .utreexo_accumulator .get(rotxn, &UnitKey)? - .unwrap_or_default() - .0; + .unwrap_or_default(); Ok(accumulator) } @@ -262,8 +228,7 @@ impl State { .iter() .map(|(_, utxo_hash)| utxo_hash.into()) .collect(); - let (proof, _) = accumulator.prove(&targets).map_err(Error::Utreexo)?; - tx.proof = proof; + tx.proof = accumulator.0.prove(&targets).map_err(Error::Utreexo)?; Ok(()) } @@ -279,7 +244,7 @@ impl State { let accumulator = self.get_accumulator(rotxn)?; let targets: Vec = utxos.into_iter().map(NodeHash::from).collect(); - let (proof, _) = accumulator.prove(&targets).map_err(Error::Utreexo)?; + let proof = accumulator.0.prove(&targets).map_err(Error::Utreexo)?; Ok(proof) } @@ -1012,6 +977,7 @@ impl State { .utreexo_accumulator .get(rwtxn, &UnitKey)? .unwrap_or_default(); + tracing::debug!("Got acc"); // New leaves for the accumulator let mut accumulator_add = Vec::::new(); // Accumulator leaves to delete diff --git a/lib/types/mod.rs b/lib/types/mod.rs index c1b2602..114c128 100644 --- a/lib/types/mod.rs +++ b/lib/types/mod.rs @@ -1,6 +1,6 @@ use bip300301::bitcoin; use borsh::BorshSerialize; -use rustreexo::accumulator::node_hash::NodeHash; +use rustreexo::accumulator::{node_hash::NodeHash, pollard::Pollard}; use serde::{Deserialize, Serialize}; use std::{ cmp::Ordering, @@ -167,3 +167,36 @@ impl PartialOrd for AggregatedWithdrawal { Some(self.cmp(other)) } } + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct Accumulator(pub Pollard); + +impl<'de> Deserialize<'de> for Accumulator { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let bytes: Vec = + as Deserialize>::deserialize(deserializer)?; + let pollard = Pollard::deserialize(&*bytes) + .inspect_err(|err| { + tracing::debug!("deserialize err: {err}\n bytes: {bytes:?}") + }) + .map_err(::custom)?; + Ok(Self(pollard)) + } +} + +impl Serialize for Accumulator { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut bytes = Vec::new(); + self.0 + .serialize(&mut bytes) + .map_err(::custom)?; + as Serialize>::serialize(&bytes, serializer) + } +} diff --git a/lib/wallet.rs b/lib/wallet.rs index bc15de4..3317933 100644 --- a/lib/wallet.rs +++ b/lib/wallet.rs @@ -10,9 +10,9 @@ use heed::{ types::{OwnedType, SerdeBincode}, Database, RoTxn, }; -use rustreexo::accumulator::{node_hash::NodeHash, pollard::Pollard}; +use rustreexo::accumulator::node_hash::NodeHash; -use crate::types::{hash, PointedOutput}; +use crate::types::{hash, Accumulator, PointedOutput}; pub use crate::{ authorization::{get_address, Authorization}, types::{ @@ -123,7 +123,7 @@ impl Wallet { pub fn create_withdrawal( &self, - accumulator: &Pollard, + accumulator: &Accumulator, main_address: bitcoin::Address, value: u64, main_fee: u64, @@ -141,7 +141,8 @@ impl Wallet { .collect(); let input_utxo_hashes: Vec = inputs.iter().map(|(_, hash)| hash.into()).collect(); - let (proof, _) = accumulator + let proof = accumulator + .0 .prove(&input_utxo_hashes) .map_err(Error::Utreexo)?; let outputs = vec![ @@ -167,7 +168,7 @@ impl Wallet { pub fn create_transaction( &self, - accumulator: &Pollard, + accumulator: &Accumulator, address: Address, value: u64, fee: u64, @@ -183,7 +184,8 @@ impl Wallet { .collect(); let input_utxo_hashes: Vec = inputs.iter().map(|(_, hash)| hash.into()).collect(); - let (proof, _) = accumulator + let proof = accumulator + .0 .prove(&input_utxo_hashes) .map_err(Error::Utreexo)?; let outputs = vec![