Skip to content

Commit 87ec087

Browse files
committed
Add infra to block ChannelMonitorUpdates on forwarded claims
When we forward a payment and receive an `update_fulfill_htlc` message from the downstream channel, we immediately claim the HTLC on the upstream channel, before even doing a `commitment_signed` dance on the downstream channel. This implies that our `ChannelMonitorUpdate`s "go out" in the right order - first we ensure we'll get our money by writing the preimage down, then we write the update that resolves giving money on the downstream node. This is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are generated, but of course looking forward we want to support asynchronous updates, which may complete in any order. Here we add infrastructure to handle downstream `ChannelMonitorUpdate`s which are blocked on an upstream preimage-containing one. We don't yet actually do the blocking which will come in a future commit.
1 parent 5239cde commit 87ec087

File tree

1 file changed

+122
-23
lines changed

1 file changed

+122
-23
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 122 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -513,12 +513,24 @@ pub(crate) enum MonitorUpdateCompletionAction {
513513
/// event can be generated.
514514
PaymentClaimed { payment_hash: PaymentHash },
515515
/// Indicates an [`events::Event`] should be surfaced to the user.
516-
EmitEvent { event: events::Event },
516+
EmitEventAndFreeOtherChannel {
517+
event: events::Event,
518+
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
519+
},
517520
}
518521

519522
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
520523
(0, PaymentClaimed) => { (0, payment_hash, required) },
521-
(2, EmitEvent) => { (0, event, upgradable_required) },
524+
(2, EmitEventAndFreeOtherChannel) => {
525+
(0, event, upgradable_required),
526+
// LDK prior to 0.0.115 did not have this field as the monitor update application order was
527+
// required by clients. If we downgrade to something prior to 0.0.115 this may result in
528+
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
529+
// support async monitor updates even in LDK 0.0.115 and once we do we'll require no
530+
// downgrades to prior versions. Thus, while this would break on downgrade, we don't
531+
// support it even without downgrade, so if it breaks its not on us ¯\_(ツ)_/¯.
532+
(1, downstream_counterparty_and_funding_outpoint, option),
533+
},
522534
);
523535

524536
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -535,6 +547,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
535547
};
536548
);
537549

550+
#[derive(Clone, PartialEq, Eq, Debug)]
551+
pub(crate) enum RAAMonitorUpdateBlockingAction {
552+
/// The inbound channel's channel_id
553+
ForwardedPaymentOtherChannelClaim {
554+
channel_id: [u8; 32],
555+
htlc_id: u64,
556+
},
557+
}
558+
559+
impl RAAMonitorUpdateBlockingAction {
560+
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
561+
Self::ForwardedPaymentOtherChannelClaim {
562+
channel_id: prev_hop.outpoint.to_channel_id(),
563+
htlc_id: prev_hop.htlc_id,
564+
}
565+
}
566+
}
567+
568+
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
569+
(0, ForwardedPaymentOtherChannelClaim) => { (0, channel_id, required), (2, htlc_id, required) }
570+
;);
571+
572+
538573
/// State we hold per-peer.
539574
pub(super) struct PeerState<Signer: ChannelSigner> {
540575
/// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -563,6 +598,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
563598
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
564599
/// duplicates do not occur, so such channels should fail without a monitor update completing.
565600
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
601+
/// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
602+
/// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
603+
/// will remove a preimage that needs to be durably in an upstream channel first), we put an
604+
/// entry here to note that the channel with the key's ID is blocked on a set of actions.
605+
actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
566606
/// The peer is currently connected (i.e. we've seen a
567607
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
568608
/// [`ChannelMessageHandler::peer_disconnected`].
@@ -4379,20 +4419,24 @@ where
43794419
},
43804420
HTLCSource::PreviousHopData(hop_data) => {
43814421
let prev_outpoint = hop_data.outpoint;
4422+
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
43824423
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
43834424
|htlc_claim_value_msat| {
43844425
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
43854426
let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
43864427
Some(claimed_htlc_value - forwarded_htlc_value)
43874428
} else { None };
43884429

4389-
Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
4390-
fee_earned_msat,
4391-
claim_from_onchain_tx: from_onchain,
4392-
prev_channel_id: Some(prev_outpoint.to_channel_id()),
4393-
next_channel_id: Some(next_channel_outpoint.to_channel_id()),
4394-
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4395-
}})
4430+
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4431+
event: events::Event::PaymentForwarded {
4432+
fee_earned_msat,
4433+
claim_from_onchain_tx: from_onchain,
4434+
prev_channel_id: Some(prev_outpoint.to_channel_id()),
4435+
next_channel_id: Some(next_channel_outpoint.to_channel_id()),
4436+
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4437+
},
4438+
downstream_counterparty_and_funding_outpoint: None,
4439+
})
43964440
} else { None }
43974441
});
43984442
if let Err((pk, err)) = res {
@@ -4419,8 +4463,13 @@ where
44194463
}, None));
44204464
}
44214465
},
4422-
MonitorUpdateCompletionAction::EmitEvent { event } => {
4466+
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4467+
event, downstream_counterparty_and_funding_outpoint
4468+
} => {
44234469
self.pending_events.lock().unwrap().push_back((event, None));
4470+
if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
4471+
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
4472+
}
44244473
},
44254474
}
44264475
}
@@ -5270,6 +5319,36 @@ where
52705319
}
52715320
}
52725321

5322+
fn raa_monitor_updates_held(&self,
5323+
actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
5324+
channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
5325+
) -> bool {
5326+
actions_blocking_raa_monitor_updates
5327+
.get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
5328+
|| self.pending_events.lock().unwrap().iter().any(|(_, action)| {
5329+
action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5330+
channel_funding_outpoint,
5331+
counterparty_node_id,
5332+
})
5333+
})
5334+
}
5335+
5336+
pub(crate) fn test_raa_monitor_updates_held(&self, counterparty_node_id: PublicKey,
5337+
channel_id: [u8; 32])
5338+
-> bool {
5339+
let per_peer_state = self.per_peer_state.read().unwrap();
5340+
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
5341+
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
5342+
let peer_state = &mut *peer_state_lck;
5343+
5344+
if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
5345+
return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
5346+
chan.get_funding_txo().unwrap(), counterparty_node_id);
5347+
}
5348+
}
5349+
false
5350+
}
5351+
52735352
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
52745353
let (htlcs_to_fail, res) = {
52755354
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -5939,25 +6018,29 @@ where
59396018
self.pending_outbound_payments.clear_pending_payments()
59406019
}
59416020

5942-
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
6021+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
59436022
let mut errors = Vec::new();
59446023
loop {
59456024
let per_peer_state = self.per_peer_state.read().unwrap();
59466025
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
59476026
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
59486027
let peer_state = &mut *peer_state_lck;
5949-
if self.pending_events.lock().unwrap().iter()
5950-
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5951-
channel_funding_outpoint, counterparty_node_id
5952-
}))
5953-
{
5954-
// Check that, while holding the peer lock, we don't have another event
5955-
// blocking any monitor updates for this channel. If we do, let those
5956-
// events be the ones that ultimately release the monitor update(s).
5957-
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
6028+
6029+
if let Some(blocker) = &completed_blocker {
6030+
if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
6031+
.get_mut(&channel_funding_outpoint.to_channel_id())
6032+
{
6033+
blockers.retain(|iter| iter != blocker);
6034+
}
6035+
}
6036+
6037+
if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
6038+
channel_funding_outpoint, counterparty_node_id) {
6039+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
59586040
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
59596041
break;
59606042
}
6043+
59616044
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
59626045
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
59636046
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
@@ -5999,7 +6082,7 @@ where
59996082
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
60006083
channel_funding_outpoint, counterparty_node_id
60016084
} => {
6002-
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
6085+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
60036086
}
60046087
}
60056088
}
@@ -6650,6 +6733,7 @@ where
66506733
latest_features: init_msg.features.clone(),
66516734
pending_msg_events: Vec::new(),
66526735
monitor_update_blocked_actions: BTreeMap::new(),
6736+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
66536737
is_connected: true,
66546738
}));
66556739
},
@@ -7787,6 +7871,7 @@ where
77877871
latest_features: Readable::read(reader)?,
77887872
pending_msg_events: Vec::new(),
77897873
monitor_update_blocked_actions: BTreeMap::new(),
7874+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
77907875
is_connected: false,
77917876
};
77927877
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -7869,7 +7954,7 @@ where
78697954
let mut probing_cookie_secret: Option<[u8; 32]> = None;
78707955
let mut claimable_htlc_purposes = None;
78717956
let mut pending_claiming_payments = Some(HashMap::new());
7872-
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
7957+
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
78737958
let mut events_override = None;
78747959
read_tlv_fields!(reader, {
78757960
(1, pending_outbound_payments_no_retry, option),
@@ -8179,7 +8264,21 @@ where
81798264
}
81808265

81818266
for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
8182-
if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
8267+
if let Some(peer_state) = per_peer_state.get(&node_id) {
8268+
for (_, actions) in monitor_update_blocked_actions.iter() {
8269+
for action in actions.iter() {
8270+
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
8271+
downstream_counterparty_and_funding_outpoint:
8272+
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
8273+
} = action {
8274+
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
8275+
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
8276+
.entry(blocked_channel_outpoint.to_channel_id())
8277+
.or_insert_with(Vec::new).push(blocking_action.clone());
8278+
}
8279+
}
8280+
}
8281+
}
81838282
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
81848283
} else {
81858284
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);

0 commit comments

Comments
 (0)