Skip to content

Commit

Permalink
DRY the pre-startup ChannelMonitorUpdate handling
Browse files Browse the repository at this point in the history
This moves the common `if during_startup { push background event }
else { apply ChannelMonitorUpdate }` pattern by simply inlining it
in `handle_new_monitor_update`.

It also ensures we always insert `ChannelMonitorUpdate`s in the
pending updates set when we push the background event, avoiding a
race where we push an update as a background event, then while its
processing another update finishes and the post-update actions get
run.
  • Loading branch information
TheBlueMatt committed Nov 18, 2024
1 parent ce9e5a7 commit 8fca379
Showing 1 changed file with 45 additions and 70 deletions.
115 changes: 45 additions & 70 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2929,26 +2929,9 @@ macro_rules! handle_error {
/// [`ChannelMonitor`]/channel funding transaction) to begin with.
macro_rules! locked_close_channel {
($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{
if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() {
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
} else {
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo)
.or_insert_with(Vec::new);
in_flight_updates.iter().position(|upd| upd == &update)
.unwrap_or_else(|| {
in_flight_updates.push(update.clone());
0
});
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo,
channel_id,
update,
};
$self.pending_background_events.lock().unwrap().push(event);
}
if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() {
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
}
// If there's a possibility that we need to generate further monitor updates for this
// channel, we need to store the last update_id of it. However, we don't want to insert
Expand Down Expand Up @@ -3279,8 +3262,8 @@ macro_rules! handle_new_monitor_update {
};
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr,
$chan_id: expr, $in_flight_updates: ident, $update_idx: ident, _internal_outer,
$completed: expr
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
_internal_outer, $completed: expr
) => { {
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
.or_insert_with(Vec::new);
Expand All @@ -3292,31 +3275,47 @@ macro_rules! handle_new_monitor_update {
$in_flight_updates.push($update);
$in_flight_updates.len() - 1
});
let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]);
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]);
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
// fail to persist it. This is a fairly safe assumption, however, since anything we do
// during the startup sequence should be replayed exactly if we immediately crash.
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id: $counterparty_node_id,
funding_txo: $funding_txo,
channel_id: $chan_id,
update: $in_flight_updates[$update_idx].clone(),
};
$self.pending_background_events.lock().unwrap().push(event);
false
}
} };
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr,
REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER
) => { {
let logger = WithChannelContext::from(&$self.logger, &$chan_context, None);
let chan_id = $chan_context.channel_id();
let counterparty_node_id = $chan_context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
in_flight_updates, idx, _internal_outer,
counterparty_node_id, in_flight_updates, idx, _internal_outer,
{
let _ = in_flight_updates.remove(idx);
})
} };
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $logger: expr, $channel_id: expr, POST_CHANNEL_CLOSE
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE
) => { {
let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None);
let in_flight_updates;
let idx;
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, $logger,
$channel_id, in_flight_updates, idx, _internal_outer,
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger,
$channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() {
Expand All @@ -3336,10 +3335,11 @@ macro_rules! handle_new_monitor_update {
) => { {
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
let chan_id = $chan.context.channel_id();
let counterparty_node_id = $chan.context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
in_flight_updates, idx, _internal_outer,
counterparty_node_id, in_flight_updates, idx, _internal_outer,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
Expand Down Expand Up @@ -3964,11 +3964,10 @@ where
},
hash_map::Entry::Vacant(_) => {},
}
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None);

handle_new_monitor_update!(
self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state,
logger, channel_id, POST_CHANNEL_CLOSE
counterparty_node_id, channel_id, POST_CHANNEL_CLOSE
);
}

Expand Down Expand Up @@ -7160,7 +7159,6 @@ where
let peer_state = &mut **peer_state_lock;
if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let counterparty_node_id = chan.context.get_counterparty_node_id();
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger);

Expand All @@ -7175,21 +7173,8 @@ where
if let Some(raa_blocker) = raa_blocker_opt {
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
}
if !during_init {
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt,
peer_state, per_peer_state, chan);
} else {
// If we're running during init we cannot update a monitor directly -
// they probably haven't actually been loaded yet. Instead, push the
// monitor update as a background event.
self.pending_background_events.lock().unwrap().push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo: prev_hop.funding_txo,
channel_id: prev_hop.channel_id,
update: monitor_update.clone(),
});
}
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt,
peer_state, per_peer_state, chan);
}
UpdateFulfillCommitFetch::DuplicateClaim {} => {
let (action_opt, raa_blocker_opt) = completion_action(None, true);
Expand Down Expand Up @@ -7304,26 +7289,10 @@ where
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}

if !during_init {
handle_new_monitor_update!(self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, logger, chan_id, POST_CHANNEL_CLOSE);
} else {
// If we're running during init we cannot update a monitor directly - they probably
// haven't actually been loaded yet. Instead, push the monitor update as a background
// event.

let in_flight_updates = peer_state.in_flight_monitor_updates
.entry(prev_hop.funding_txo)
.or_insert_with(Vec::new);
in_flight_updates.push(preimage_update.clone());

let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo: prev_hop.funding_txo,
channel_id: prev_hop.channel_id,
update: preimage_update,
};
self.pending_background_events.lock().unwrap().push(event);
}
handle_new_monitor_update!(
self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state,
counterparty_node_id, chan_id, POST_CHANNEL_CLOSE
);
}

fn finalize_claims(&self, sources: Vec<HTLCSource>) {
Expand Down Expand Up @@ -13342,14 +13311,20 @@ where
}
}
}
let mut per_peer_state = per_peer_state.get(counterparty_node_id)
.expect("If we have pending updates for a channel it has to have an entry")
.lock().unwrap();
if updated_id {
per_peer_state.get(counterparty_node_id)
.expect("If we have pending updates for a channel it has to have an entry")
.lock().unwrap()
per_peer_state
.closed_channel_monitor_update_ids.entry(*channel_id)
.and_modify(|v| *v = cmp::max(update.update_id, *v))
.or_insert(update.update_id);
}
let in_flight_updates = per_peer_state.in_flight_monitor_updates
.entry(*funding_txo)
.or_insert_with(Vec::new);
debug_assert!(!in_flight_updates.iter().any(|upd| upd == update));
in_flight_updates.push(update.clone());
}
pending_background_events.push(new_event);
}
Expand Down

0 comments on commit 8fca379

Please sign in to comment.