diff --git a/Cargo.lock b/Cargo.lock index 56d5e96..422c4fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,7 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 - [[package]] name = "addr2line" version = "0.16.0" diff --git a/contrib/tools/mscompiler/Cargo.lock b/contrib/tools/mscompiler/Cargo.lock index 6164040..2242717 100644 --- a/contrib/tools/mscompiler/Cargo.lock +++ b/contrib/tools/mscompiler/Cargo.lock @@ -1,7 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 - [[package]] name = "base64" version = "0.13.0" diff --git a/contrib/tools/txbuilder/Cargo.lock b/contrib/tools/txbuilder/Cargo.lock index 3609481..d4e7eb4 100644 --- a/contrib/tools/txbuilder/Cargo.lock +++ b/contrib/tools/txbuilder/Cargo.lock @@ -1,7 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 - [[package]] name = "base64" version = "0.13.0" diff --git a/src/bitcoind/interface.rs b/src/bitcoind/interface.rs index 09dd8e8..1cbd38d 100644 --- a/src/bitcoind/interface.rs +++ b/src/bitcoind/interface.rs @@ -413,6 +413,38 @@ impl BitcoinD { self.make_node_request_failible("sendrawtransaction", ¶ms!(tx_hex)) .map(|_| ()) } + + pub fn get_block_stats(&self, blockhash: BlockHash) -> Result { + let res = self.make_node_request( + "getblockheader", + ¶ms!(Json::String(blockhash.to_string()),), + ); + let confirmations = res + .get("confirmations") + .map(|a| a.as_i64()) + .flatten() + .expect("Invalid confirmations in `getblockheader` response: not an i64") + as i32; + let previous_block_str = res + .get("previousblockhash") + .map(|a| a.as_str()) + .flatten() + .expect("Invalid previousblockhash in `getblockheader` response: not a string"); + let previous_blockhash = BlockHash::from_str(previous_block_str) + .expect("Invalid previousblockhash hex in `getblockheader` response"); + let height = res + .get("height") + .map(|a| a.as_i64()) + .flatten() + .expect("Invalid height in `getblockheader` response: not an u32") + as i32; + Ok(BlockStats { + confirmations, + previous_blockhash, + height, + blockhash, + }) + } } /// Info about bitcoind's sync state @@ -436,3 +468,11 @@ pub struct UtxoInfo { pub bestblock: BlockHash, pub value: Amount, } + +#[derive(Debug, Clone, PartialEq)] +pub struct BlockStats { + pub confirmations: i32, + pub previous_blockhash: BlockHash, + pub blockhash: BlockHash, + pub height: i32, +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 6db615e..cf18bc6 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -291,6 +291,10 @@ pub fn db_blank_vaults(db_path: &path::Path) -> Result, DatabaseErr |row| row.try_into(), ) } + pub fn db_vaults(db_path: &path::Path) -> Result, DatabaseError> { + db_query(db_path, "SELECT * FROM vaults", [], |row| row.try_into()) + } + /// Get a list of all vaults we noticed were unvaulted pub fn db_unvaulted_vaults(db_path: &path::Path) -> Result, DatabaseError> { @@ -342,6 +346,23 @@ pub fn db_cancel_signatures( db_sigs_by_type(db_path, vault_id, SigTxType::Cancel) } +pub fn db_update_heights_after_reorg( + db_path: &path::Path, + ancestor_height: i32, +) -> Result<(), DatabaseError> { + db_exec(&db_path, |db_tx| { + db_tx.execute( + "UPDATE vaults SET unvault_height = NULL WHERE unvault_height > (?1)", + params![ancestor_height], + )?; + db_tx.execute( + "UPDATE vaults SET spent_height = NULL WHERE spent_height > (?1)", + params![ancestor_height], + )?; + Ok(()) + }) +} + // Create the db file with RW permissions only for the user fn create_db_file(db_path: &path::Path) -> Result<(), DatabaseError> { let mut options = fs::OpenOptions::new(); diff --git a/src/plugins.rs b/src/plugins.rs index 98a70f2..f04b170 100644 --- a/src/plugins.rs +++ b/src/plugins.rs @@ -111,26 +111,10 @@ impl Plugin { self.path.to_string_lossy() } - /// Takes updates about our vaults' status and returns which should be Canceled, if any. - /// - /// This will start a plugin process and write a JSON request to its stding containing: - /// - the block height of the last block - /// - the info of vaults that were unvaulted - /// - the deposit outpoint of the vaults that were succesfully spent - /// - the deposit outpoint of the unvaulted vaults that were revaulted - /// It will then read a JSON response from its stdout containing a list of deposit outpoints - /// of vaults that should be canceled, and expect the plugin process to terminate. - pub fn poll( - &self, - block_height: i32, - block_info: &NewBlockInfo, - ) -> Result, PluginError> { - let query = serde_json::json!({ - "method": "new_block", - "config": self.config, - "block_height": block_height, - "block_info": block_info, - }); + fn send_message(&self, query: &serde_json::Value) -> Result + where + T: serde::de::DeserializeOwned, + { let mut query_ser = serde_json::to_vec(&query).expect("No string in map and Serialize won't fail"); query_ser.push(b'\n'); @@ -171,11 +155,48 @@ impl Plugin { self.path.as_path(), String::from_utf8_lossy(&output.stdout) ); - let resp: NewBlockResponse = serde_json::from_slice(&output.stdout) + let resp: T = serde_json::from_slice(&output.stdout) .map_err(|e| PluginError::Deserialization(self.path.clone(), e.to_string()))?; + Ok(resp) + } + + /// Takes updates about our vaults' status and returns which should be Canceled, if any. + /// + /// This will start a plugin process and write a JSON request to its stding containing: + /// - the block height of the last block + /// - the info of vaults that were unvaulted + /// - the deposit outpoint of the vaults that were succesfully spent + /// - the deposit outpoint of the unvaulted vaults that were revaulted + /// It will then read a JSON response from its stdout containing a list of deposit outpoints + /// of vaults that should be canceled, and expect the plugin process to terminate. + pub fn poll( + &self, + block_height: i32, + block_info: &NewBlockInfo, + ) -> Result, PluginError> { + let query = serde_json::json!({ + "method": "new_block", + "config": self.config, + "block_height": block_height, + "block_info": block_info, + }); + let resp: NewBlockResponse = self.send_message(&query)?; + Ok(resp.revault) } + + /// Informs the plugin that it should wipe every information it has + /// regarding the block at height `block_height` and the following ones + pub fn invalidate_block(&self, block_height: i32) -> Result<(), PluginError> { + let query = serde_json::json!({ + "method": "invalidate_block", + "config": self.config, + "block_height": block_height, + }); + + self.send_message(&query) + } } #[cfg(test)] diff --git a/src/poller.rs b/src/poller.rs index 15b74a3..e332499 100644 --- a/src/poller.rs +++ b/src/poller.rs @@ -7,7 +7,7 @@ use crate::{ database::{ db_blank_vaults, db_cancel_signatures, db_del_vault, db_instance, db_should_cancel_vault, db_should_not_cancel_vault, db_unvault_spender_confirmed, db_unvaulted_vaults, - db_update_tip, db_vault, schema::DbVault, DatabaseError, + db_update_heights_after_reorg, db_update_tip, db_vault, schema::DbVault, DatabaseError, }, plugins::{NewBlockInfo, VaultInfo}, }; @@ -128,6 +128,7 @@ fn manage_unvaulted_vaults( let mut unvaulted_vaults = db_unvaulted_vaults(db_path)?; // We don't have all the unvaulted_vaults in db, some of them are // in our db_updates + log::error!("UNVAULTED: {:?}", unvaulted_vaults); let mut new_unvaulted_vaults = db_updates .new_unvaulted .clone() @@ -135,6 +136,7 @@ fn manage_unvaulted_vaults( .map(|v| v.clone()) .collect::>(); unvaulted_vaults.append(&mut new_unvaulted_vaults); + log::error!("UNVAULTED: {:?}", unvaulted_vaults); let mut updated_vaults = UpdatedVaults { successful_attempts: vec![], revaulted_attempts: vec![], @@ -455,6 +457,7 @@ fn new_block( bitcoind: &BitcoinD, current_tip: &ChainTip, ) -> Result<(), PollerError> { + log::error!("NEW BLOCK: {:?} {:?}", current_tip.hash, current_tip.height); // Storing everything we need to udpate in the db, so we can update it // all in one batch at the end let mut db_updates = DbUpdates::default(); @@ -536,6 +539,52 @@ fn new_block( Ok(()) } +fn handle_reorg( + db_path: &path::Path, + config: &Config, + bitcoind: &BitcoinD, + old_tip: &ChainTip, +) -> Result<(), PollerError> { + // Finding the common ancestor + let mut stats = bitcoind.get_block_stats(old_tip.hash)?; + let mut ancestor = ChainTip { + hash: old_tip.hash, + height: old_tip.height, + }; + + while stats.confirmations == -1 { + stats = bitcoind.get_block_stats(stats.previous_blockhash)?; + log::error!("STATS {:?}", stats); + ancestor = ChainTip { + hash: stats.blockhash, + height: stats.height, + }; + } + + log::debug!( + "Unwinding the state until the common ancestor: {:?} {:?}", + ancestor.hash, + ancestor.height, + ); + + use crate::database::db_vaults; + log::debug!("VAULTS {:?}", db_vaults(db_path)); + db_update_heights_after_reorg(db_path, ancestor.height)?; + log::debug!("VAULTS {:?}", db_vaults(db_path)); + + for plugin in &config.plugins { + plugin + .invalidate_block(ancestor.height + 1) + // TODO: vabbuó qua non so deserializzare + .unwrap_or_else(|e| + // FIXME: should we crash instead? + log::error!("Error when invalidating block in plugin: '{}'", e)); + } + db_update_tip(db_path, ancestor.height, ancestor.hash)?; + + Ok(()) +} + pub fn main_loop( db_path: &path::Path, secp: &secp256k1::Secp256k1, @@ -549,7 +598,11 @@ pub fn main_loop( if bitcoind_tip.height > db_instance.tip_blockheight { let curr_tip_hash = bitcoind.block_hash(db_instance.tip_blockheight); if db_instance.tip_blockheight != 0 && curr_tip_hash != db_instance.tip_blockhash { - panic!("No reorg handling yet"); + let tip = ChainTip { + hash: db_instance.tip_blockhash, + height: db_instance.tip_blockheight, + }; + handle_reorg(db_path, config, bitcoind, &tip)?; } match new_block(db_path, secp, config, bitcoind, &bitcoind_tip) { @@ -557,7 +610,11 @@ pub fn main_loop( Err(e) => return Err(e), } } else if bitcoind_tip.hash != db_instance.tip_blockhash { - panic!("No reorg handling yet"); + let tip = ChainTip { + hash: db_instance.tip_blockhash, + height: db_instance.tip_blockheight, + }; + handle_reorg(db_path, config, bitcoind, &tip)?; } thread::sleep(config.bitcoind_config.poll_interval_secs); diff --git a/tests/plugins/max_value_in_flight.py b/tests/plugins/max_value_in_flight.py index 2e14398..1b5330e 100755 --- a/tests/plugins/max_value_in_flight.py +++ b/tests/plugins/max_value_in_flight.py @@ -51,28 +51,41 @@ def recorded_attempts(config): req = read_request() config = req["config"] assert "data_dir" in config and "max_value" in config - block_info = req["block_info"] maybe_create_data_dir(config) assert DATASTORE_FNAME in os.listdir(config["data_dir"]) - - # First update the recorded attempts with the new and pass attempts. in_flight = recorded_attempts(config) - for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]: - del in_flight[op] - for v in block_info["new_attempts"]: - in_flight[v["deposit_outpoint"]] = v["value"] - update_in_flight(config, in_flight) - - # Did we get above the threshold? Note we might stay a bit above the threshold - # for the time that the vaults we told the WT to revault previously actually get - # their Cancel transaction confirmed. - resp = {"revault": []} - value_in_flight = sum([in_flight[k] for k in in_flight]) - while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0: - v = block_info["new_attempts"].pop(0) - resp["revault"].append(v["deposit_outpoint"]) - value_in_flight -= v["value"] - continue + + resp = {} + if req["method"] == "new_block": + block_info = req["block_info"] + # First update the recorded attempts with the new and pass attempts. + for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]: + del in_flight[op] + for v in block_info["new_attempts"]: + in_flight[v["deposit_outpoint"]] = v["value"] + update_in_flight(config, in_flight) + + # Did we get above the threshold? Note we might stay a bit above the threshold + # for the time that the vaults we told the WT to revault previously actually get + # their Cancel transaction confirmed. + resp = {"revault": []} + value_in_flight = sum([in_flight[k] for k in in_flight]) + while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0: + v = block_info["new_attempts"].pop(0) + resp["revault"].append(v["deposit_outpoint"]) + value_in_flight -= v["value"] + continue + elif req["method"] == "invalidate_block": + resp = { "log": []} + resp["log"].append(in_flight) + blockheight = req["block_height"] + in_flight = [v for v in in_flight if v["block_height"] < block_height] + resp["log"].append(in_flight) + update_in_flight(config, in_flight) + else: + # TODO: maybe we should reply saying that we don't know what + # they're talking about? + pass sys.stdout.write(json.dumps(resp)) sys.stdout.flush() diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 6c0d84d..04d5452 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -5,7 +5,6 @@ from fixtures import * from test_framework.utils import COIN, DEPOSIT_ADDRESS, DERIV_INDEX, CSV - def test_max_value_in_flight(miradord, bitcoind): """ Sanity check that we are only going to revault when there is more value in flight @@ -23,6 +22,7 @@ def test_max_value_in_flight(miradord, bitcoind): [{"path": plugin_path, "config": {"data_dir": datadir, "max_value": max_value}}] ) + """ # Should get us exactly to the max value unvault_txids = [] spend_txs = [] @@ -117,6 +117,87 @@ def test_max_value_in_flight(miradord, bitcoind): # Generate two days worth of blocks, the WT should forget about this vault bitcoind.generate_block(288) miradord.wait_for_log(f"Forgetting about consumed vault at '{deposit_outpoint}'") + """ + + # Now try to spend more than the max_value + deposit_txid, deposit_outpoint = bitcoind.create_utxo( + DEPOSIT_ADDRESS, deposit_value * 10 + ) + bitcoind.generate_block(1, deposit_txid) + txs = miradord.watch_vault(deposit_outpoint, deposit_value * 10 * COIN, DERIV_INDEX) + unvault_txid = bitcoind.rpc.decoderawtransaction(txs["unvault"]["tx"])["txid"] + bitcoind.rpc.sendrawtransaction(txs["unvault"]["tx"]) + bitcoind.generate_block(1, unvault_txid) + cancel_txid = bitcoind.rpc.decoderawtransaction(txs["cancel"]["tx"])["txid"] + # The watchtower cancels + miradord.wait_for_logs( + [ + f"Got a confirmed Unvault UTXO for vault at '{deposit_outpoint}'", + f"Broadcasted Cancel transaction '{txs['cancel']['tx']}'", + f"Unvault transaction '{unvault_txid}' for vault at '{deposit_outpoint}' is still unspent", + ] + ) + bitcoind.generate_block(1, wait_for_mempool=cancel_txid) + miradord.wait_for_log( + f"Cancel transaction was confirmed for vault at '{deposit_outpoint}'" + ) + # And we reorg out the cancel + import logging + logging.error(bitcoind.rpc.getblockcount()) + logging.error(bitcoind.rpc.getbestblockhash()) + bitcoind.simple_reorg(bitcoind.rpc.getblockcount(), shift=-1) + logging.error(bitcoind.rpc.getblockcount()) + logging.error(bitcoind.rpc.getbestblockhash()) + + # The watchtower should try to cancel again + miradord.wait_for_logs( + [ + f"Got a confirmed Unvault UTXO for vault at '{deposit_outpoint}'", + f"Broadcasted Cancel transaction '{txs['cancel']['tx']}'", + f"Unvault transaction '{unvault_txid}' for vault at '{deposit_outpoint}' is still unspent", + ] + ) + bitcoind.generate_block(1, wait_for_mempool=cancel_txid) + miradord.wait_for_log( + f"Cancel transaction was confirmed for vault at '{deposit_outpoint}'" + ) + + # Prova a reorgare tutto il vault... + """ + # Now the plugin should know that we have 0 in flight, + # and we can spend max_value normally + unvault_txids = [] + spend_txs = [] + for _ in range(3): + deposit_txid, deposit_outpoint = bitcoind.create_utxo( + DEPOSIT_ADDRESS, deposit_value + ) + bitcoind.generate_block(1, deposit_txid) + txs = miradord.watch_vault(deposit_outpoint, deposit_value * COIN, DERIV_INDEX) + spend_txs.append(txs["spend"]["tx"]) + unvault_txids.append( + bitcoind.rpc.decoderawtransaction(txs["unvault"]["tx"])["txid"] + ) + bitcoind.rpc.sendrawtransaction(txs["unvault"]["tx"]) + bitcoind.generate_block(1, unvault_txids[-1]) + miradord.wait_for_logs( + [ + f"Got a confirmed Unvault UTXO for vault at '{deposit_outpoint}'", + "Done processing block", + ] + ) + # The Cancel transactions have not been broadcast + assert len(bitcoind.rpc.getrawmempool()) == 0 + # If we mine a new block, they'll still won't be + bitcoind.generate_block(1) + miradord.wait_for_logs( + [f"Unvault transaction '{txid}' .* is still unspent" for txid in unvault_txids] + + ["Done processing block"] + ) + for txid in unvault_txids: + assert bitcoind.rpc.gettxout(txid, 0, True) is not None + """ + def test_multiple_plugins(miradord, bitcoind):