Skip to content

Commit

Permalink
WIP: Reorg handling
Browse files Browse the repository at this point in the history
Closes revault#22
  • Loading branch information
danielabrozzoni committed Mar 3, 2022
1 parent db395e8 commit b9360e3
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 46 deletions.
41 changes: 41 additions & 0 deletions src/bitcoind/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,39 @@ impl BitcoinD {
self.make_node_request_failible("sendrawtransaction", &params!(tx_hex))
.map(|_| ())
}

/// Get some stats (previousblockhash, confirmations, height) about this block
pub fn get_block_stats(&self, blockhash: BlockHash) -> Result<BlockStats, BitcoindError> {
let res = self.make_node_request(
"getblockheader",
&params!(Json::String(blockhash.to_string()),),
);
let confirmations = res
.get("confirmations")
.map(|a| a.as_i64())
.flatten()
.expect("Invalid confirmations in `getblockheader` response: not an i64")
as i32;
let previous_block_str = res
.get("previousblockhash")
.map(|a| a.as_str())
.flatten()
.expect("Invalid previousblockhash in `getblockheader` response: not a string");
let previous_blockhash = BlockHash::from_str(previous_block_str)
.expect("Invalid previousblockhash hex in `getblockheader` response");
let height = res
.get("height")
.map(|a| a.as_i64())
.flatten()
.expect("Invalid height in `getblockheader` response: not an u32")
as i32;
Ok(BlockStats {
confirmations,
previous_blockhash,
height,
blockhash,
})
}
}

/// Info about bitcoind's sync state
Expand All @@ -436,3 +469,11 @@ pub struct UtxoInfo {
pub bestblock: BlockHash,
pub value: Amount,
}

#[derive(Debug, Clone, PartialEq)]
pub struct BlockStats {
pub confirmations: i32,
pub previous_blockhash: BlockHash,
pub blockhash: BlockHash,
pub height: i32,
}
19 changes: 19 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,25 @@ pub fn db_cancel_signatures(
db_sigs_by_type(db_path, vault_id, SigTxType::Cancel)
}

/// Set the unvault_height and spent_height to NULL, if the unvault and spent
/// txs weren't confirmed at the ancestor_height
pub fn db_update_heights_after_reorg(
db_path: &path::Path,
ancestor_height: i32,
) -> Result<(), DatabaseError> {
db_exec(&db_path, |db_tx| {
db_tx.execute(
"UPDATE vaults SET unvault_height = NULL WHERE unvault_height > (?1)",
params![ancestor_height],
)?;
db_tx.execute(
"UPDATE vaults SET spent_height = NULL WHERE spent_height > (?1)",
params![ancestor_height],
)?;
Ok(())
})
}

// Create the db file with RW permissions only for the user
fn create_db_file(db_path: &path::Path) -> Result<(), DatabaseError> {
let mut options = fs::OpenOptions::new();
Expand Down
68 changes: 47 additions & 21 deletions src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
os::unix::fs::MetadataExt,
path,
process::{Command, Stdio},
collections::HashMap,
};

use serde::{Deserialize, Serialize, Serializer};
Expand Down Expand Up @@ -111,26 +112,14 @@ impl Plugin {
self.path.to_string_lossy()
}

/// Takes updates about our vaults' status and returns which should be Canceled, if any.
///
/// This will start a plugin process and write a JSON request to its stding containing:
/// - the block height of the last block
/// - the info of vaults that were unvaulted
/// - the deposit outpoint of the vaults that were succesfully spent
/// - the deposit outpoint of the unvaulted vaults that were revaulted
/// It will then read a JSON response from its stdout containing a list of deposit outpoints
/// of vaults that should be canceled, and expect the plugin process to terminate.
pub fn poll(
&self,
block_height: i32,
block_info: &NewBlockInfo,
) -> Result<Vec<OutPoint>, PluginError> {
let query = serde_json::json!({
"method": "new_block",
"config": self.config,
"block_height": block_height,
"block_info": block_info,
});
/// This will start a plugin process and write a JSON request to its stding containing
/// the query.
/// It will then read a JSON response from its stdout containing <T>,
/// and expect the plugin process to terminate.
fn send_message<T>(&self, query: &serde_json::Value) -> Result<T, PluginError>
where
T: serde::de::DeserializeOwned,
{
let mut query_ser =
serde_json::to_vec(&query).expect("No string in map and Serialize won't fail");
query_ser.push(b'\n');
Expand Down Expand Up @@ -171,11 +160,48 @@ impl Plugin {
self.path.as_path(),
String::from_utf8_lossy(&output.stdout)
);
let resp: NewBlockResponse = serde_json::from_slice(&output.stdout)
let resp: T = serde_json::from_slice(&output.stdout)
.map_err(|e| PluginError::Deserialization(self.path.clone(), e.to_string()))?;

Ok(resp)
}

/// Takes updates about our vaults' status and returns which should be Canceled, if any.
///
/// This will send to the plugin:
/// - the block height of the last block
/// - the info of vaults that were unvaulted
/// - the deposit outpoint of the vaults that were succesfully spent
/// - the deposit outpoint of the unvaulted vaults that were revaulted
/// and will receive a list of deposit outpoints of vaults that should be
/// canceled.
pub fn poll(
&self,
block_height: i32,
block_info: &NewBlockInfo,
) -> Result<Vec<OutPoint>, PluginError> {
let query = serde_json::json!({
"method": "new_block",
"config": self.config,
"block_height": block_height,
"block_info": block_info,
});
let resp: NewBlockResponse = self.send_message(&query)?;

Ok(resp.revault)
}

/// Informs the plugin that it should wipe every information it has
/// regarding the block at height `block_height` and the following ones
pub fn invalidate_block(&self, block_height: i32) -> Result<HashMap<(), ()>, PluginError> {
let query = serde_json::json!({
"method": "invalidate_block",
"config": self.config,
"block_height": block_height,
});

self.send_message(&query)
}
}

#[cfg(test)]
Expand Down
56 changes: 53 additions & 3 deletions src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
database::{
db_blank_vaults, db_cancel_signatures, db_del_vault, db_instance, db_should_cancel_vault,
db_should_not_cancel_vault, db_unvault_spender_confirmed, db_unvaulted_vaults,
db_update_tip, db_vault, schema::DbVault, DatabaseError,
db_update_heights_after_reorg, db_update_tip, db_vault, schema::DbVault, DatabaseError,
},
plugins::{NewBlockInfo, VaultInfo},
};
Expand Down Expand Up @@ -536,6 +536,48 @@ fn new_block(
Ok(())
}

fn handle_reorg(
db_path: &path::Path,
config: &Config,
bitcoind: &BitcoinD,
old_tip: &ChainTip,
) -> Result<(), PollerError> {
// Finding the common ancestor
let mut stats = bitcoind.get_block_stats(old_tip.hash)?;
let mut ancestor = ChainTip {
hash: old_tip.hash,
height: old_tip.height,
};

while stats.confirmations == -1 {
stats = bitcoind.get_block_stats(stats.previous_blockhash)?;
ancestor = ChainTip {
hash: stats.blockhash,
height: stats.height,
};
}

log::debug!(
"Unwinding the state until the common ancestor: {:?} {:?}",
ancestor.hash,
ancestor.height,
);

db_update_heights_after_reorg(db_path, ancestor.height)?;

for plugin in &config.plugins {
plugin
.invalidate_block(ancestor.height + 1)
.map(|_| ())
.unwrap_or_else(|e|
// FIXME: should we crash instead?
log::error!("Error when invalidating block in plugin: '{}'", e));
}
db_update_tip(db_path, ancestor.height, ancestor.hash)?;

Ok(())
}

pub fn main_loop(
db_path: &path::Path,
secp: &secp256k1::Secp256k1<impl secp256k1::Verification>,
Expand All @@ -549,7 +591,11 @@ pub fn main_loop(
if bitcoind_tip.height > db_instance.tip_blockheight {
let curr_tip_hash = bitcoind.block_hash(db_instance.tip_blockheight);
if db_instance.tip_blockheight != 0 && curr_tip_hash != db_instance.tip_blockhash {
panic!("No reorg handling yet");
let tip = ChainTip {
hash: db_instance.tip_blockhash,
height: db_instance.tip_blockheight,
};
handle_reorg(db_path, config, bitcoind, &tip)?;
}

match new_block(db_path, secp, config, bitcoind, &bitcoind_tip) {
Expand All @@ -559,7 +605,11 @@ pub fn main_loop(
Err(e) => return Err(e),
}
} else if bitcoind_tip.hash != db_instance.tip_blockhash {
panic!("No reorg handling yet");
let tip = ChainTip {
hash: db_instance.tip_blockhash,
height: db_instance.tip_blockheight,
};
handle_reorg(db_path, config, bitcoind, &tip)?;
}

thread::sleep(config.bitcoind_config.poll_interval_secs);
Expand Down
47 changes: 28 additions & 19 deletions tests/plugins/max_value_in_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,37 @@ def recorded_attempts(config):
req = read_request()
config = req["config"]
assert "data_dir" in config and "max_value" in config
block_info = req["block_info"]
maybe_create_data_dir(config)
assert DATASTORE_FNAME in os.listdir(config["data_dir"])

# First update the recorded attempts with the new and pass attempts.
in_flight = recorded_attempts(config)
for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]:
del in_flight[op]
for v in block_info["new_attempts"]:
in_flight[v["deposit_outpoint"]] = v["value"]
update_in_flight(config, in_flight)

# Did we get above the threshold? Note we might stay a bit above the threshold
# for the time that the vaults we told the WT to revault previously actually get
# their Cancel transaction confirmed.
resp = {"revault": []}
value_in_flight = sum([in_flight[k] for k in in_flight])
while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0:
v = block_info["new_attempts"].pop(0)
resp["revault"].append(v["deposit_outpoint"])
value_in_flight -= v["value"]
continue
resp = {}
if req["method"] == "new_block":
block_info = req["block_info"]
# First update the recorded attempts with the new and pass attempts.
in_flight = recorded_attempts(config)
for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]:
del in_flight[op]
for v in block_info["new_attempts"]:
in_flight[v["deposit_outpoint"]] = v["value"]
update_in_flight(config, in_flight)

# Did we get above the threshold? Note we might stay a bit above the threshold
# for the time that the vaults we told the WT to revault previously actually get
# their Cancel transaction confirmed.
resp = {"revault": []}
value_in_flight = sum([in_flight[k] for k in in_flight])
while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0:
v = block_info["new_attempts"].pop(0)
resp["revault"].append(v["deposit_outpoint"])
value_in_flight -= v["value"]
continue
elif req["method"] == "invalidate_block":
# We don't really care
pass
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass

sys.stdout.write(json.dumps(resp))
sys.stdout.flush()
13 changes: 11 additions & 2 deletions tests/plugins/revault_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ def read_request():

if __name__ == "__main__":
req = read_request()
block_info = req["block_info"]
resp = {"revault": [v["deposit_outpoint"] for v in block_info["new_attempts"]]}
resp = {}
if req["method"] == "new_block":
block_info = req["block_info"]
resp = {"revault": [v["deposit_outpoint"] for v in block_info["new_attempts"]]}
elif req["method"] == "invalidate_block":
# We don't really care
pass
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass
sys.stdout.write(json.dumps(resp))
sys.stdout.flush()
11 changes: 10 additions & 1 deletion tests/plugins/revault_nothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ def read_request():

if __name__ == "__main__":
req = read_request()
resp = {"revault": []}
resp = {}
if req["method"] == "new_block":
resp = {"revault": []}
elif req["method"] == "invalidate_block":
# We don't really care
pass
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass
sys.stdout.write(json.dumps(resp))
sys.stdout.flush()

0 comments on commit b9360e3

Please sign in to comment.