Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
danielabrozzoni committed Feb 23, 2022
1 parent 580a0a3 commit d3dfc21
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 50 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions contrib/tools/mscompiler/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions contrib/tools/txbuilder/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions src/bitcoind/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,38 @@ impl BitcoinD {
self.make_node_request_failible("sendrawtransaction", &params!(tx_hex))
.map(|_| ())
}

pub fn get_block_stats(&self, blockhash: BlockHash) -> Result<BlockStats, BitcoindError> {
let res = self.make_node_request(
"getblockheader",
&params!(Json::String(blockhash.to_string()),),
);
let confirmations = res
.get("confirmations")
.map(|a| a.as_i64())
.flatten()
.expect("Invalid confirmations in `getblockheader` response: not an i64")
as i32;
let previous_block_str = res
.get("previousblockhash")
.map(|a| a.as_str())
.flatten()
.expect("Invalid previousblockhash in `getblockheader` response: not a string");
let previous_blockhash = BlockHash::from_str(previous_block_str)
.expect("Invalid previousblockhash hex in `getblockheader` response");
let height = res
.get("height")
.map(|a| a.as_i64())
.flatten()
.expect("Invalid height in `getblockheader` response: not an u32")
as i32;
Ok(BlockStats {
confirmations,
previous_blockhash,
height,
blockhash,
})
}
}

/// Info about bitcoind's sync state
Expand All @@ -436,3 +468,11 @@ pub struct UtxoInfo {
pub bestblock: BlockHash,
pub value: Amount,
}

#[derive(Debug, Clone, PartialEq)]
pub struct BlockStats {
pub confirmations: i32,
pub previous_blockhash: BlockHash,
pub blockhash: BlockHash,
pub height: i32,
}
21 changes: 21 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ pub fn db_blank_vaults(db_path: &path::Path) -> Result<Vec<DbVault>, DatabaseErr
|row| row.try_into(),
)
}
pub fn db_vaults(db_path: &path::Path) -> Result<Vec<DbVault>, DatabaseError> {
db_query(db_path, "SELECT * FROM vaults", [], |row| row.try_into())
}


/// Get a list of all vaults we noticed were unvaulted
pub fn db_unvaulted_vaults(db_path: &path::Path) -> Result<Vec<DbVault>, DatabaseError> {
Expand Down Expand Up @@ -342,6 +346,23 @@ pub fn db_cancel_signatures(
db_sigs_by_type(db_path, vault_id, SigTxType::Cancel)
}

pub fn db_update_heights_after_reorg(
db_path: &path::Path,
ancestor_height: i32,
) -> Result<(), DatabaseError> {
db_exec(&db_path, |db_tx| {
db_tx.execute(
"UPDATE vaults SET unvault_height = NULL WHERE unvault_height > (?1)",
params![ancestor_height],
)?;
db_tx.execute(
"UPDATE vaults SET spent_height = NULL WHERE spent_height > (?1)",
params![ancestor_height],
)?;
Ok(())
})
}

// Create the db file with RW permissions only for the user
fn create_db_file(db_path: &path::Path) -> Result<(), DatabaseError> {
let mut options = fs::OpenOptions::new();
Expand Down
63 changes: 42 additions & 21 deletions src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,10 @@ impl Plugin {
self.path.to_string_lossy()
}

/// Takes updates about our vaults' status and returns which should be Canceled, if any.
///
/// This will start a plugin process and write a JSON request to its stding containing:
/// - the block height of the last block
/// - the info of vaults that were unvaulted
/// - the deposit outpoint of the vaults that were succesfully spent
/// - the deposit outpoint of the unvaulted vaults that were revaulted
/// It will then read a JSON response from its stdout containing a list of deposit outpoints
/// of vaults that should be canceled, and expect the plugin process to terminate.
pub fn poll(
&self,
block_height: i32,
block_info: &NewBlockInfo,
) -> Result<Vec<OutPoint>, PluginError> {
let query = serde_json::json!({
"method": "new_block",
"config": self.config,
"block_height": block_height,
"block_info": block_info,
});
fn send_message<T>(&self, query: &serde_json::Value) -> Result<T, PluginError>
where
T: serde::de::DeserializeOwned,
{
let mut query_ser =
serde_json::to_vec(&query).expect("No string in map and Serialize won't fail");
query_ser.push(b'\n');
Expand Down Expand Up @@ -171,11 +155,48 @@ impl Plugin {
self.path.as_path(),
String::from_utf8_lossy(&output.stdout)
);
let resp: NewBlockResponse = serde_json::from_slice(&output.stdout)
let resp: T = serde_json::from_slice(&output.stdout)
.map_err(|e| PluginError::Deserialization(self.path.clone(), e.to_string()))?;

Ok(resp)
}

/// Takes updates about our vaults' status and returns which should be Canceled, if any.
///
/// This will start a plugin process and write a JSON request to its stding containing:
/// - the block height of the last block
/// - the info of vaults that were unvaulted
/// - the deposit outpoint of the vaults that were succesfully spent
/// - the deposit outpoint of the unvaulted vaults that were revaulted
/// It will then read a JSON response from its stdout containing a list of deposit outpoints
/// of vaults that should be canceled, and expect the plugin process to terminate.
pub fn poll(
&self,
block_height: i32,
block_info: &NewBlockInfo,
) -> Result<Vec<OutPoint>, PluginError> {
let query = serde_json::json!({
"method": "new_block",
"config": self.config,
"block_height": block_height,
"block_info": block_info,
});
let resp: NewBlockResponse = self.send_message(&query)?;

Ok(resp.revault)
}

/// Informs the plugin that it should wipe every information it has
/// regarding the block at height `block_height` and the following ones
pub fn invalidate_block(&self, block_height: i32) -> Result<(), PluginError> {
let query = serde_json::json!({
"method": "invalidate_block",
"config": self.config,
"block_height": block_height,
});

self.send_message(&query)
}
}

#[cfg(test)]
Expand Down
63 changes: 60 additions & 3 deletions src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
database::{
db_blank_vaults, db_cancel_signatures, db_del_vault, db_instance, db_should_cancel_vault,
db_should_not_cancel_vault, db_unvault_spender_confirmed, db_unvaulted_vaults,
db_update_tip, db_vault, schema::DbVault, DatabaseError,
db_update_heights_after_reorg, db_update_tip, db_vault, schema::DbVault, DatabaseError,
},
plugins::{NewBlockInfo, VaultInfo},
};
Expand Down Expand Up @@ -128,13 +128,15 @@ fn manage_unvaulted_vaults(
let mut unvaulted_vaults = db_unvaulted_vaults(db_path)?;
// We don't have all the unvaulted_vaults in db, some of them are
// in our db_updates
log::error!("UNVAULTED: {:?}", unvaulted_vaults);
let mut new_unvaulted_vaults = db_updates
.new_unvaulted
.clone()
.values()
.map(|v| v.clone())
.collect::<Vec<_>>();
unvaulted_vaults.append(&mut new_unvaulted_vaults);
log::error!("UNVAULTED: {:?}", unvaulted_vaults);
let mut updated_vaults = UpdatedVaults {
successful_attempts: vec![],
revaulted_attempts: vec![],
Expand Down Expand Up @@ -455,6 +457,7 @@ fn new_block(
bitcoind: &BitcoinD,
current_tip: &ChainTip,
) -> Result<(), PollerError> {
log::error!("NEW BLOCK: {:?} {:?}", current_tip.hash, current_tip.height);
// Storing everything we need to udpate in the db, so we can update it
// all in one batch at the end
let mut db_updates = DbUpdates::default();
Expand Down Expand Up @@ -536,6 +539,52 @@ fn new_block(
Ok(())
}

fn handle_reorg(
db_path: &path::Path,
config: &Config,
bitcoind: &BitcoinD,
old_tip: &ChainTip,
) -> Result<(), PollerError> {
// Finding the common ancestor
let mut stats = bitcoind.get_block_stats(old_tip.hash)?;
let mut ancestor = ChainTip {
hash: old_tip.hash,
height: old_tip.height,
};

while stats.confirmations == -1 {
stats = bitcoind.get_block_stats(stats.previous_blockhash)?;
log::error!("STATS {:?}", stats);
ancestor = ChainTip {
hash: stats.blockhash,
height: stats.height,
};
}

log::debug!(
"Unwinding the state until the common ancestor: {:?} {:?}",
ancestor.hash,
ancestor.height,
);

use crate::database::db_vaults;
log::debug!("VAULTS {:?}", db_vaults(db_path));
db_update_heights_after_reorg(db_path, ancestor.height)?;
log::debug!("VAULTS {:?}", db_vaults(db_path));

for plugin in &config.plugins {
plugin
.invalidate_block(ancestor.height + 1)
// TODO: vabbuó qua non so deserializzare
.unwrap_or_else(|e|
// FIXME: should we crash instead?
log::error!("Error when invalidating block in plugin: '{}'", e));
}
db_update_tip(db_path, ancestor.height, ancestor.hash)?;

Ok(())
}

pub fn main_loop(
db_path: &path::Path,
secp: &secp256k1::Secp256k1<impl secp256k1::Verification>,
Expand All @@ -549,15 +598,23 @@ pub fn main_loop(
if bitcoind_tip.height > db_instance.tip_blockheight {
let curr_tip_hash = bitcoind.block_hash(db_instance.tip_blockheight);
if db_instance.tip_blockheight != 0 && curr_tip_hash != db_instance.tip_blockhash {
panic!("No reorg handling yet");
let tip = ChainTip {
hash: db_instance.tip_blockhash,
height: db_instance.tip_blockheight,
};
handle_reorg(db_path, config, bitcoind, &tip)?;
}

match new_block(db_path, secp, config, bitcoind, &bitcoind_tip) {
Ok(()) | Err(PollerError::TipChanged) => {}
Err(e) => return Err(e),
}
} else if bitcoind_tip.hash != db_instance.tip_blockhash {
panic!("No reorg handling yet");
let tip = ChainTip {
hash: db_instance.tip_blockhash,
height: db_instance.tip_blockheight,
};
handle_reorg(db_path, config, bitcoind, &tip)?;
}

thread::sleep(config.bitcoind_config.poll_interval_secs);
Expand Down
51 changes: 32 additions & 19 deletions tests/plugins/max_value_in_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,41 @@ def recorded_attempts(config):
req = read_request()
config = req["config"]
assert "data_dir" in config and "max_value" in config
block_info = req["block_info"]
maybe_create_data_dir(config)
assert DATASTORE_FNAME in os.listdir(config["data_dir"])

# First update the recorded attempts with the new and pass attempts.
in_flight = recorded_attempts(config)
for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]:
del in_flight[op]
for v in block_info["new_attempts"]:
in_flight[v["deposit_outpoint"]] = v["value"]
update_in_flight(config, in_flight)

# Did we get above the threshold? Note we might stay a bit above the threshold
# for the time that the vaults we told the WT to revault previously actually get
# their Cancel transaction confirmed.
resp = {"revault": []}
value_in_flight = sum([in_flight[k] for k in in_flight])
while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0:
v = block_info["new_attempts"].pop(0)
resp["revault"].append(v["deposit_outpoint"])
value_in_flight -= v["value"]
continue

resp = {}
if req["method"] == "new_block":
block_info = req["block_info"]
# First update the recorded attempts with the new and pass attempts.
for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]:
del in_flight[op]
for v in block_info["new_attempts"]:
in_flight[v["deposit_outpoint"]] = v["value"]
update_in_flight(config, in_flight)

# Did we get above the threshold? Note we might stay a bit above the threshold
# for the time that the vaults we told the WT to revault previously actually get
# their Cancel transaction confirmed.
resp = {"revault": []}
value_in_flight = sum([in_flight[k] for k in in_flight])
while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0:
v = block_info["new_attempts"].pop(0)
resp["revault"].append(v["deposit_outpoint"])
value_in_flight -= v["value"]
continue
elif req["method"] == "invalidate_block":
resp = { "log": []}
resp["log"].append(in_flight)
blockheight = req["block_height"]
in_flight = [v for v in in_flight if v["block_height"] < block_height]
resp["log"].append(in_flight)
update_in_flight(config, in_flight)
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass

sys.stdout.write(json.dumps(resp))
sys.stdout.flush()
Loading

0 comments on commit d3dfc21

Please sign in to comment.