diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 95d0b5f8a7b..9f12ff3dba3 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -3150,8 +3150,11 @@ impl ChannelMonitorImpl { panic!("Attempted to apply post-force-close ChannelMonitorUpdate that wasn't providing a payment preimage"); }, } - } else if self.latest_update_id + 1 != updates.update_id { - panic!("Attempted to apply ChannelMonitorUpdates out of order, check the update_id before passing an update to update_monitor!"); + } + if updates.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID { + if self.latest_update_id + 1 != updates.update_id { + panic!("Attempted to apply ChannelMonitorUpdates out of order, check the update_id before passing an update to update_monitor!"); + } } let mut ret = Ok(()); let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&**fee_estimator); diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 5b276b1c2ae..dc67b198149 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3288,7 +3288,7 @@ fn do_test_durable_preimages_on_closed_channel(close_chans_before_reload: bool, // Finally, check that B created a payment preimage transaction and close out the payment. let bs_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); assert_eq!(bs_txn.len(), if close_chans_before_reload && !close_only_a { 2 } else { 1 }); - let bs_preimage_tx = &bs_txn[0]; + let bs_preimage_tx = bs_txn.iter().find(|tx| tx.input[0].previous_output.txid == as_closing_tx[0].compute_txid()).unwrap(); check_spends!(bs_preimage_tx, as_closing_tx[0]); if !close_chans_before_reload { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 86faeac9a68..073c83fc0f1 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1332,6 +1332,11 @@ impl PeerState where SP::Target: SignerProvider { if require_disconnected && self.is_connected { return false } + for (_, updates) in self.in_flight_monitor_updates.iter() { + if !updates.is_empty() { + return false; + } + } !self.channel_by_id.iter().any(|(_, phase)| match phase { ChannelPhase::Funded(_) | ChannelPhase::UnfundedOutboundV1(_) => true, @@ -1341,7 +1346,6 @@ impl PeerState where SP::Target: SignerProvider { } ) && self.monitor_update_blocked_actions.is_empty() - && self.in_flight_monitor_updates.is_empty() && self.closed_channel_monitor_update_ids.is_empty() } @@ -2938,8 +2942,34 @@ macro_rules! handle_error { /// /// Note that this step can be skipped if the channel was never opened (through the creation of a /// [`ChannelMonitor`]/channel funding transaction) to begin with. -macro_rules! update_maps_on_chan_removal { - ($self: expr, $peer_state: expr, $channel_context: expr) => {{ +macro_rules! locked_close_channel { + ($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{ + if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() { + if $self.background_events_processed_since_startup.load(Ordering::Acquire) { + handle_new_monitor_update!($self, funding_txo, update, $peer_state, + $channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER); + } else { + // We want to track the in-flight update both in `in_flight_monitor_updates` and in + // `pending_background_events` to avoid a race condition during + // `pending_background_events` processing where we complete one + // `ChannelMonitorUpdate` (but there are more pending) but we conclude that all + // pending `ChannelMonitorUpdate`s have completed and its safe to run + // post-completion actions. We could work around that with some effort, but its + // simpler to just track updates twice. + let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo) + .or_insert_with(Vec::new); + if !in_flight_updates.contains(&update) { + in_flight_updates.push(update.clone()); + } + let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo, + channel_id, + update, + }; + $self.pending_background_events.lock().unwrap().push(event); + } + } // If there's a possibility that we need to generate further monitor updates for this // channel, we need to store the last update_id of it. However, we don't want to insert // into the map (which prevents the `PeerState` from being cleaned up) for channels that @@ -2999,8 +3029,8 @@ macro_rules! convert_chan_phase_err { ChannelError::Close((msg, reason)) => { let logger = WithChannelContext::from(&$self.logger, &$channel.context, None); log_error!(logger, "Closing channel {} due to close-required error: {}", $channel_id, msg); - update_maps_on_chan_removal!($self, $peer_state, $channel.context); - let shutdown_res = $channel.context.force_shutdown(true, reason); + let mut shutdown_res = $channel.context.force_shutdown(true, reason); + locked_close_channel!($self, $peer_state, &$channel.context, &mut shutdown_res); let err = MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $channel_update); (true, err) @@ -3067,10 +3097,10 @@ macro_rules! try_chan_phase_entry { } macro_rules! remove_channel_phase { - ($self: expr, $peer_state: expr, $entry: expr) => { + ($self: ident, $peer_state: expr, $entry: expr, $shutdown_res_mut: expr) => { { let channel = $entry.remove_entry().1; - update_maps_on_chan_removal!($self, $peer_state, &channel.context()); + locked_close_channel!($self, $peer_state, &channel.context(), $shutdown_res_mut); channel } } @@ -3241,18 +3271,17 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { { + ($self: ident, $update_res: expr, $logger: expr, $channel_id: expr, _internal, $completed: expr) => { { debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); - let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); match $update_res { ChannelMonitorUpdateStatus::UnrecoverableError => { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); + log_error!($logger, "{}", err_str); panic!("{}", err_str); }, ChannelMonitorUpdateStatus::InProgress => { - log_debug!(logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", - &$chan.context.channel_id()); + log_debug!($logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", + $channel_id); false }, ChannelMonitorUpdateStatus::Completed => { @@ -3262,22 +3291,52 @@ macro_rules! handle_new_monitor_update { } } }; ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => { - handle_new_monitor_update!($self, $update_res, $chan, _internal, + let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); + handle_new_monitor_update!($self, $update_res, logger, $chan.context.channel_id(), _internal, handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) }; - ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { - let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr, + $chan_id: expr, $in_flight_updates: ident, $update_idx: ident, _internal_outer, + $completed: expr + ) => { { + $in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) .or_insert_with(Vec::new); // During startup, we push monitor updates as background events through to here in // order to replay updates that were in-flight when we shut down. Thus, we have to // filter for uniqueness here. - let idx = in_flight_updates.iter().position(|upd| upd == &$update) + $update_idx = $in_flight_updates.iter().position(|upd| upd == &$update) .unwrap_or_else(|| { - in_flight_updates.push($update); - in_flight_updates.len() - 1 + $in_flight_updates.push($update); + $in_flight_updates.len() - 1 }); - let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); - handle_new_monitor_update!($self, update_res, $chan, _internal, + let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]); + handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed) + } }; + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr, + REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER + ) => { { + let logger = WithChannelContext::from(&$self.logger, &$chan_context, None); + let chan_id = $chan_context.channel_id(); + let in_flight_updates; + let idx; + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, + in_flight_updates, idx, _internal_outer, + { + let _ = in_flight_updates.remove(idx); + }) + } }; + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, + $per_peer_state_lock: expr, $chan: expr + ) => { { + let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); + let chan_id = $chan.context.channel_id(); + let in_flight_updates; + let idx; + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, + in_flight_updates, idx, _internal_outer, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { @@ -3760,8 +3819,10 @@ where peer_state_lock, peer_state, per_peer_state, chan); } } else { - let mut chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry); - shutdown_result = Some(chan_phase.context_mut().force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) })); + let mut shutdown_res = chan_phase_entry.get_mut().context_mut() + .force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }); + remove_channel_phase!(self, peer_state, chan_phase_entry, shutdown_res); + shutdown_result = Some(shutdown_res); } }, hash_map::Entry::Vacant(_) => { @@ -3882,8 +3943,8 @@ where && matches!(&monitor_update.updates[0], ChannelMonitorUpdateStep::ChannelForceClosed { .. }); // If the ChannelMonitorUpdate is closing a channel that never got past initial // funding (to have any commitment updates), we'll skip inserting in - // `update_maps_on_chan_removal`, allowing us to avoid keeping around the PeerState - // for that peer. In that specific case we expect no entry in the map here. In any + // `locked_close_channel`, allowing us to avoid keeping around the PeerState for + // that peer. In that specific case we expect no entry in the map here. In any // other cases, this is a bug, but in production we go ahead and recover by // inserting the update_id and hoping its right. debug_assert!(is_closing_unupdated_monitor, "Expected closing monitor against an unused channel, got {:?}", monitor_update); @@ -3907,7 +3968,7 @@ where } /// When a channel is removed, two things need to happen: - /// (a) [`update_maps_on_chan_removal`] must be called in the same `per_peer_state` lock as + /// (a) [`locked_close_channel`] must be called in the same `per_peer_state` lock as /// the channel-closing action, /// (b) this needs to be called without holding any locks (except /// [`ChannelManager::total_consistency_lock`]. @@ -3931,12 +3992,31 @@ where self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } if let Some((_, funding_txo, _channel_id, monitor_update)) = shutdown_res.monitor_update { - // There isn't anything we can do if we get an update failure - we're already - // force-closing. The monitor update on the required in-memory copy should broadcast - // the latest local state, which is the best we can do anyway. Thus, it is safe to - // ignore the result here. + debug_assert!(false, "This should have been handled in `locked_close_channel`"); let _ = self.apply_post_close_monitor_update(shutdown_res.counterparty_node_id, shutdown_res.channel_id, funding_txo, monitor_update); } + if self.background_events_processed_since_startup.load(Ordering::Acquire) { + // If a `ChannelMonitorUpdate` was applied (i.e. any time we have a funding txo and are + // not in the startup sequence) check if we need to handle any + // `MonitorUpdateCompletionAction`s. + // TODO: If we do the `in_flight_monitor_updates.is_empty()` check in + // `locked_close_channel` we can skip the locks here. + if let Some(funding_txo) = shutdown_res.channel_funding_txo { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&shutdown_res.counterparty_node_id) { + let mut peer_state = peer_state_mtx.lock().unwrap(); + if peer_state.in_flight_monitor_updates.get(&funding_txo).map(|l| l.is_empty()).unwrap_or(true) { + let update_actions = peer_state.monitor_update_blocked_actions + .remove(&shutdown_res.channel_id).unwrap_or(Vec::new()); + + mem::drop(peer_state); + mem::drop(per_peer_state); + + self.handle_monitor_update_completion_actions(update_actions); + } + } + } + } let mut shutdown_results = Vec::new(); if let Some(txid) = shutdown_res.unbroadcasted_batch_funding_txid { let mut funding_batch_states = self.funding_batch_states.lock().unwrap(); @@ -3947,8 +4027,9 @@ where if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { let mut peer_state = peer_state_mutex.lock().unwrap(); if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) { - update_maps_on_chan_removal!(self, peer_state, &chan.context()); - shutdown_results.push(chan.context_mut().force_shutdown(false, ClosureReason::FundingBatchClosure)); + let mut close_res = chan.context_mut().force_shutdown(false, ClosureReason::FundingBatchClosure); + locked_close_channel!(self, &mut *peer_state, chan.context(), close_res); + shutdown_results.push(close_res); } } has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state)); @@ -4005,23 +4086,26 @@ where ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(broadcast) } }; let logger = WithContext::from(&self.logger, Some(*peer_node_id), Some(*channel_id), None); - if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) { + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) { log_error!(logger, "Force-closing channel {}", channel_id); - let mut chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry); - mem::drop(peer_state); - mem::drop(per_peer_state); - match chan_phase { - ChannelPhase::Funded(mut chan) => { - self.finish_close_channel(chan.context.force_shutdown(broadcast, closure_reason)); - (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) + let (mut shutdown_res, update_opt) = match chan_phase_entry.get_mut() { + ChannelPhase::Funded(ref mut chan) => { + ( + chan.context.force_shutdown(broadcast, closure_reason), + self.get_channel_update_for_broadcast(&chan).ok(), + ) }, ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV2(_) | ChannelPhase::UnfundedInboundV2(_) => { - self.finish_close_channel(chan_phase.context_mut().force_shutdown(false, closure_reason)); // Unfunded channel has no update - (None, chan_phase.context().get_counterparty_node_id()) + (chan_phase_entry.get_mut().context_mut().force_shutdown(false, closure_reason), None) }, - } + }; + let chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry, shutdown_res); + mem::drop(peer_state); + mem::drop(per_peer_state); + self.finish_close_channel(shutdown_res); + (update_opt, chan_phase.context().get_counterparty_node_id()) } else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() { log_error!(logger, "Force-closing channel {}", &channel_id); // N.B. that we don't send any channel close event here: we @@ -5285,9 +5369,10 @@ where .map(|peer_state_mutex| peer_state_mutex.lock().unwrap()) .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id).map(|chan| (chan, peer_state))) .map(|(mut chan, mut peer_state)| { - update_maps_on_chan_removal!(self, peer_state, &chan.context()); let closure_reason = ClosureReason::ProcessingError { err: e.clone() }; - shutdown_results.push(chan.context_mut().force_shutdown(false, closure_reason)); + let mut close_res = chan.context_mut().force_shutdown(false, closure_reason); + locked_close_channel!(self, peer_state, chan.context(), close_res); + shutdown_results.push(close_res); peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: counterparty_node_id, action: msgs::ErrorAction::SendErrorMessage { @@ -6387,8 +6472,9 @@ where log_error!(logger, "Force-closing pending channel with ID {} for not establishing in a timely manner", context.channel_id()); - update_maps_on_chan_removal!(self, $peer_state, context); - shutdown_channels.push(context.force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) })); + let mut close_res = context.force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }); + locked_close_channel!(self, $peer_state, context, close_res); + shutdown_channels.push(close_res); $pending_msg_events.push(MessageSendEvent::HandleError { node_id: context.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { @@ -7579,30 +7665,36 @@ where if peer_state_mutex_opt.is_none() { return } peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - let channel = - if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(channel_id) { - chan - } else { - let update_actions = peer_state.monitor_update_blocked_actions - .remove(channel_id).unwrap_or(Vec::new()); - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(update_actions); - return; - }; + let remaining_in_flight = if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) { pending.retain(|upd| upd.update_id > highest_applied_update_id); pending.len() } else { 0 }; - let logger = WithChannelContext::from(&self.logger, &channel.context, None); - log_trace!(logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.", - highest_applied_update_id, channel.context.get_latest_monitor_update_id(), - remaining_in_flight); - if !channel.is_awaiting_monitor_update() || remaining_in_flight != 0 { + + let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(*channel_id), None); + log_trace!(logger, "ChannelMonitor updated to {}. {} pending in-flight updates.", + highest_applied_update_id, remaining_in_flight); + + if remaining_in_flight != 0 { return; } - handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel); + + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(channel_id) { + if chan.is_awaiting_monitor_update() { + log_trace!(logger, "Channel is open and awaiting update, resuming it"); + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan); + } else { + log_trace!(logger, "Channel is open but not awaiting update"); + } + } else { + let update_actions = peer_state.monitor_update_blocked_actions + .remove(channel_id).unwrap_or(Vec::new()); + log_trace!(logger, "Channel is closed, applying {} post-update actions", update_actions.len()); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(update_actions); + } } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -8072,9 +8164,8 @@ where // Note that at this point we've filled in the funding outpoint on our // channel, but its actually in conflict with another channel. Thus, if // we call `convert_chan_phase_err` immediately (thus calling - // `update_maps_on_chan_removal`), we'll remove the existing channel - // from `outpoint_to_peer`. Thus, we must first unset the funding outpoint - // on the channel. + // `locked_close_channel`), we'll remove the existing channel from `outpoint_to_peer`. + // Thus, we must first unset the funding outpoint on the channel. let err = ChannelError::close($err.to_owned()); chan.unset_funding_info(msg.temporary_channel_id); return Err(convert_chan_phase_err!(self, peer_state, err, chan, &funded_channel_id, UNFUNDED_CHANNEL).1); @@ -8564,9 +8655,12 @@ where }, ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV2(_) | ChannelPhase::UnfundedOutboundV2(_) => { - log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); - let mut chan = remove_channel_phase!(self, peer_state, chan_phase_entry); - finish_shutdown = Some(chan.context_mut().force_shutdown(false, ClosureReason::CounterpartyCoopClosedUnfundedChannel)); + let context = phase.context_mut(); + let logger = WithChannelContext::from(&self.logger, context, None); + log_error!(logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); + let mut close_res = phase.context_mut().force_shutdown(false, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + remove_channel_phase!(self, peer_state, chan_phase_entry, close_res); + finish_shutdown = Some(close_res); }, } } else { @@ -8607,14 +8701,19 @@ where msg, }); } - if tx.is_some() { + if let Some(mut close_res) = shutdown_result { // We're done with this channel, we've got a signed closing transaction and // will send the closing_signed back to the remote peer upon return. This // also implies there are no pending HTLCs left on the channel, so we can // fully delete it from tracking (the channel monitor is still around to // watch for old state broadcasts)! - (tx, Some(remove_channel_phase!(self, peer_state, chan_phase_entry)), shutdown_result) - } else { (tx, None, shutdown_result) } + debug_assert!(tx.is_some()); + let channel_phase = remove_channel_phase!(self, peer_state, chan_phase_entry, close_res); + (tx, Some(channel_phase), Some(close_res)) + } else { + debug_assert!(tx.is_none()); + (tx, None, None) + } } else { return try_chan_phase_entry!(self, peer_state, Err(ChannelError::close( "Got a closing_signed message for an unfunded channel!".into())), chan_phase_entry); @@ -9337,14 +9436,16 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) { - if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, peer_state, chan_phase_entry) { - let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event { - reason - } else { - ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) } - }; - failed_channels.push(chan.context.force_shutdown(false, reason.clone())); + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) { + let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event { + reason + } else { + ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) } + }; + let mut shutdown_res = chan_phase_entry.get_mut().context_mut().force_shutdown(false, reason.clone()); + let chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry, shutdown_res); + failed_channels.push(shutdown_res); + if let ChannelPhase::Funded(chan) = chan_phase { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { @@ -9354,7 +9455,10 @@ where pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.context.get_counterparty_node_id(), action: msgs::ErrorAction::DisconnectPeer { - msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: reason.to_string() }) + msg: Some(msgs::ErrorMessage { + channel_id: chan.context.channel_id(), + data: reason.to_string() + }) }, }); } @@ -9534,7 +9638,7 @@ where let context = &chan.context(); let logger = WithChannelContext::from(&self.logger, context, None); log_trace!(logger, "Removing channel {} now that the signer is unblocked", context.channel_id()); - update_maps_on_chan_removal!(self, peer_state, context); + locked_close_channel!(self, peer_state, context, shutdown_result); shutdown_results.push(shutdown_result); false } else { @@ -9575,7 +9679,8 @@ where }); } debug_assert_eq!(shutdown_result_opt.is_some(), chan.is_shutdown()); - if let Some(shutdown_result) = shutdown_result_opt { + if let Some(mut shutdown_result) = shutdown_result_opt { + locked_close_channel!(self, peer_state, &chan.context, shutdown_result); shutdown_results.push(shutdown_result); } if let Some(tx) = tx_opt { @@ -9590,7 +9695,6 @@ where log_info!(logger, "Broadcasting {}", log_tx!(tx)); self.tx_broadcaster.broadcast_transactions(&[&tx]); - update_maps_on_chan_removal!(self, peer_state, &chan.context); false } else { true } }, @@ -9619,32 +9723,6 @@ where has_update } - /// Handle a list of channel failures during a block_connected or block_disconnected call, - /// pushing the channel monitor update (if any) to the background events queue and removing the - /// Channel object. - fn handle_init_event_channel_failures(&self, mut failed_channels: Vec) { - for mut failure in failed_channels.drain(..) { - // Either a commitment transactions has been confirmed on-chain or - // Channel::block_disconnected detected that the funding transaction has been - // reorganized out of the main chain. - // We cannot broadcast our latest local state via monitor update (as - // Channel::force_shutdown tries to make us do) as we may still be in initialization, - // so we track the update internally and handle it when the user next calls - // timer_tick_occurred, guaranteeing we're running normally. - if let Some((counterparty_node_id, funding_txo, channel_id, update)) = failure.monitor_update.take() { - assert_eq!(update.updates.len(), 1); - if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { - assert!(should_broadcast); - } else { unreachable!(); } - self.pending_background_events.lock().unwrap().push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, funding_txo, update, channel_id, - }); - } - self.finish_close_channel(failure); - } - } - /// Utility for creating a BOLT11 invoice that can be verified by [`ChannelManager`] without /// storing any additional state. It achieves this by including a [`PaymentSecret`] in the /// invoice which it uses to verify that the invoice has not expired and the payment amount is @@ -11056,11 +11134,12 @@ where } } } else if let Err(reason) = res { - update_maps_on_chan_removal!(self, peer_state, &channel.context); // It looks like our counterparty went on-chain or funding transaction was // reorged out of the main chain. Close the channel. let reason_message = format!("{}", reason); - failed_channels.push(channel.context.force_shutdown(true, reason)); + let mut close_res = channel.context.force_shutdown(true, reason); + locked_close_channel!(self, peer_state, &channel.context, close_res); + failed_channels.push(close_res); if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { @@ -11136,7 +11215,9 @@ where }); } - self.handle_init_event_channel_failures(failed_channels); + for failure in failed_channels { + self.finish_close_channel(failure); + } for (source, payment_hash, reason, destination) in timed_out_htlcs.drain(..) { self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, destination); @@ -11495,8 +11576,9 @@ where }, }; // Clean up for removal. - update_maps_on_chan_removal!(self, peer_state, &context); - failed_channels.push(context.force_shutdown(false, ClosureReason::DisconnectedPeer)); + let mut close_res = context.force_shutdown(false, ClosureReason::DisconnectedPeer); + locked_close_channel!(self, peer_state, &context, close_res); + failed_channels.push(close_res); false }); // Note that we don't bother generating any events for pre-accept channels - @@ -13595,8 +13677,36 @@ where } } - // Note that we have to do the above replays before we push new monitor updates. - pending_background_events.append(&mut close_background_events); + // The newly generated `close_background_events` have to be added after any updates that + // were already in-flight on shutdown, so we append them here. + pending_background_events.reserve(close_background_events.len()); + 'each_bg_event: for mut new_event in close_background_events { + if let BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, funding_txo, channel_id, update, + } = &mut new_event { + debug_assert_eq!(update.updates.len(), 1); + debug_assert!(matches!(update.updates[0], ChannelMonitorUpdateStep::ChannelForceClosed { .. })); + for pending_event in pending_background_events.iter() { + if let BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: pending_cp, funding_txo: pending_funding, + channel_id: pending_chan_id, update: pending_update, + } = pending_event { + let for_same_channel = counterparty_node_id == pending_cp + && funding_txo == pending_funding + && channel_id == pending_chan_id; + if for_same_channel { + if pending_update.updates.iter().any(|upd| matches!(upd, ChannelMonitorUpdateStep::ChannelForceClosed { .. })) { + // If the background event we're looking at is just + // force-closing the channel which already has a pending + // force-close update, no need to duplicate it. + continue 'each_bg_event; + } + } + } + } + } + pending_background_events.push(new_event); + } // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we // should ensure we try them again on the inbound edge. We put them here and do so after we diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index c629c5bbce6..b1b4f77c590 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -268,6 +268,9 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ assert_eq!(nodes[1].node.list_channels()[0].confirmations, Some(10)); if !reorg_after_reload { + // With expect_channel_force_closed set the TestChainMonitor will enforce that the next update + // is a ChannelForceClosed on the right channel with should_broadcast set. + *nodes[0].chain_monitor.expect_channel_force_closed.lock().unwrap() = Some((chan.2, true)); if use_funding_unconfirmed { let relevant_txids = nodes[0].node.get_relevant_txids(); assert_eq!(relevant_txids.len(), 1); @@ -293,12 +296,17 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ let relevant_txids = nodes[0].node.get_relevant_txids(); assert_eq!(relevant_txids.len(), 0); + let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(txn.len(), 1); + { let per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap(); assert_eq!(peer_state.channel_by_id.len(), 0); assert_eq!(nodes[0].node.short_to_chan_info.read().unwrap().len(), 0); } + + check_added_monitors!(nodes[0], 1); } if reload_node { @@ -310,10 +318,13 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ let chan_0_monitor_serialized = get_monitor!(nodes[0], chan.2).encode(); reload_node!(nodes[0], *nodes[0].node.get_current_default_configuration(), &nodes_0_serialized, &[&chan_0_monitor_serialized], persister, new_chain_monitor, nodes_0_deserialized); - assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); } if reorg_after_reload { + // With expect_channel_force_closed set the TestChainMonitor will enforce that the next update + // is a ChannelForceClosed on the right channel with should_broadcast set. + *nodes[0].chain_monitor.expect_channel_force_closed.lock().unwrap() = Some((chan.2, true)); + if use_funding_unconfirmed { let relevant_txids = nodes[0].node.get_relevant_txids(); assert_eq!(relevant_txids.len(), 1); @@ -345,12 +356,18 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ assert_eq!(peer_state.channel_by_id.len(), 0); assert_eq!(nodes[0].node.short_to_chan_info.read().unwrap().len(), 0); } + + if reload_node { + // The update may come when we free background events if we just restarted, or in-line if + // we were already running. + nodes[0].node.test_process_background_events(); + } + check_added_monitors!(nodes[0], 1); + + let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(txn.len(), 1); } - // With expect_channel_force_closed set the TestChainMonitor will enforce that the next update - // is a ChannelForcClosed on the right channel with should_broadcast set. - *nodes[0].chain_monitor.expect_channel_force_closed.lock().unwrap() = Some((chan.2, true)); - nodes[0].node.test_process_background_events(); // Required to free the pending background monitor update - check_added_monitors!(nodes[0], 1); + let expected_err = "Funding transaction was un-confirmed. Locked at 6 confs, now have 0 confs."; if reorg_after_reload || !reload_node { handle_announce_close_broadcast_events(&nodes, 0, 1, true, "Channel closed because of an exception: Funding transaction was un-confirmed. Locked at 6 confs, now have 0 confs."); @@ -361,8 +378,6 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: expected_err.to_owned() }, [nodes[1].node.get_our_node_id()], 100000); - assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 1); - nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear(); // Now check that we can create a new channel if reload_node && nodes[0].node.per_peer_state.read().unwrap().len() == 0 {