@@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid;
29
29
use chain;
30
30
use chain:: { ChannelMonitorUpdateErr , Filter , WatchedOutput } ;
31
31
use chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
32
- use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs } ;
32
+ use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , LATENCY_GRACE_PERIOD_BLOCKS } ;
33
33
use chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use chain:: keysinterface:: Sign ;
35
35
use util:: atomic_counter:: AtomicCounter ;
@@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails;
42
42
use prelude:: * ;
43
43
use sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
44
44
use core:: ops:: Deref ;
45
- use core:: sync:: atomic:: { AtomicBool , Ordering } ;
45
+ use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
46
46
47
47
#[ derive( Clone , Copy , Hash , PartialEq , Eq ) ]
48
48
/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
@@ -168,6 +168,13 @@ struct MonitorHolder<ChannelSigner: Sign> {
168
168
/// processed the closure event, we set this to true and return PermanentFailure for any other
169
169
/// chain::Watch events.
170
170
channel_perm_failed : AtomicBool ,
171
+ /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
172
+ /// in `pending_monitor_updates`.
173
+ /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
174
+ /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
175
+ /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
176
+ /// forever, risking funds loss.
177
+ last_chain_persist_height : AtomicUsize ,
171
178
}
172
179
173
180
impl < ChannelSigner : Sign > MonitorHolder < ChannelSigner > {
@@ -226,6 +233,8 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
226
233
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
227
234
/// from the user and not from a [`ChannelMonitor`].
228
235
pending_monitor_events : Mutex < Vec < MonitorEvent > > ,
236
+ /// The best block height seen, used as a proxy for the passage of time.
237
+ highest_chain_height : AtomicUsize ,
229
238
}
230
239
231
240
impl < ChannelSigner : Sign , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref > ChainMonitor < ChannelSigner , C , T , F , L , P >
@@ -244,13 +253,25 @@ where C::Target: chain::Filter,
244
253
/// calls must not exclude any transactions matching the new outputs nor any in-block
245
254
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
246
255
/// updated `txdata`.
247
- fn process_chain_data < FN > ( & self , header : & BlockHeader , txdata : & TransactionData , process : FN )
256
+ ///
257
+ /// Calls which represent a new blockchain tip height should set `best_height`.
258
+ fn process_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN )
248
259
where
249
260
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
250
261
{
251
262
let mut dependent_txdata = Vec :: new ( ) ;
252
263
{
253
264
let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
265
+ if let Some ( height) = best_height {
266
+ // If the best block height is being updated, update highest_chain_height under the
267
+ // monitors write lock.
268
+ let old_height = self . highest_chain_height . load ( Ordering :: Acquire ) ;
269
+ let new_height = height as usize ;
270
+ if new_height > old_height {
271
+ self . highest_chain_height . store ( new_height, Ordering :: Release ) ;
272
+ }
273
+ }
274
+
254
275
for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
255
276
let monitor = & monitor_state. monitor ;
256
277
let mut txn_outputs;
@@ -260,6 +281,14 @@ where C::Target: chain::Filter,
260
281
contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
261
282
} ;
262
283
let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
284
+ if let Some ( height) = best_height {
285
+ if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
286
+ // If there are not ChainSync persists awaiting completion, go ahead and
287
+ // set last_chain_persist_height here - we wouldn't want the first
288
+ // TemporaryFailure to always immediately be considered "overly delayed".
289
+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
290
+ }
291
+ }
263
292
264
293
log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
265
294
match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
@@ -304,7 +333,7 @@ where C::Target: chain::Filter,
304
333
dependent_txdata. sort_unstable_by_key ( |( index, _tx) | * index) ;
305
334
dependent_txdata. dedup_by_key ( |( index, _tx) | * index) ;
306
335
let txdata: Vec < _ > = dependent_txdata. iter ( ) . map ( |( index, tx) | ( * index, tx) ) . collect ( ) ;
307
- self . process_chain_data ( header, & txdata, process) ;
336
+ self . process_chain_data ( header, None , & txdata, process) ; // We skip the best height the second go-around
308
337
}
309
338
}
310
339
@@ -325,6 +354,7 @@ where C::Target: chain::Filter,
325
354
fee_estimator : feeest,
326
355
persister,
327
356
pending_monitor_events : Mutex :: new ( Vec :: new ( ) ) ,
357
+ highest_chain_height : AtomicUsize :: new ( 0 ) ,
328
358
}
329
359
}
330
360
@@ -428,9 +458,11 @@ where C::Target: chain::Filter,
428
458
} ) ;
429
459
} ,
430
460
MonitorUpdateId { contents : UpdateOrigin :: ChainSync ( _) } => {
431
- // We've already done everything we need to, the next time
432
- // release_pending_monitor_events is called, any events for this ChannelMonitor
433
- // will be returned if there's no more SyncPersistId events left.
461
+ if !monitor_data. has_pending_chainsync_updates ( & pending_monitor_updates) {
462
+ monitor_data. last_chain_persist_height . store ( self . highest_chain_height . load ( Ordering :: Acquire ) , Ordering :: Release ) ;
463
+ // The next time release_pending_monitor_events is called, any events for this
464
+ // ChannelMonitor will be returned.
465
+ }
434
466
} ,
435
467
}
436
468
Ok ( ( ) )
@@ -470,7 +502,7 @@ where
470
502
let header = & block. header ;
471
503
let txdata: Vec < _ > = block. txdata . iter ( ) . enumerate ( ) . collect ( ) ;
472
504
log_debug ! ( self . logger, "New best block {} at height {} provided via block_connected" , header. block_hash( ) , height) ;
473
- self . process_chain_data ( header, & txdata, |monitor, txdata| {
505
+ self . process_chain_data ( header, Some ( height ) , & txdata, |monitor, txdata| {
474
506
monitor. block_connected (
475
507
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
476
508
} ) ;
@@ -497,7 +529,7 @@ where
497
529
{
498
530
fn transactions_confirmed ( & self , header : & BlockHeader , txdata : & TransactionData , height : u32 ) {
499
531
log_debug ! ( self . logger, "{} provided transactions confirmed at height {} in block {}" , txdata. len( ) , height, header. block_hash( ) ) ;
500
- self . process_chain_data ( header, txdata, |monitor, txdata| {
532
+ self . process_chain_data ( header, None , txdata, |monitor, txdata| {
501
533
monitor. transactions_confirmed (
502
534
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
503
535
} ) ;
@@ -513,7 +545,7 @@ where
513
545
514
546
fn best_block_updated ( & self , header : & BlockHeader , height : u32 ) {
515
547
log_debug ! ( self . logger, "New best block {} at height {} provided via best_block_updated" , header. block_hash( ) , height) ;
516
- self . process_chain_data ( header, & [ ] , |monitor, txdata| {
548
+ self . process_chain_data ( header, Some ( height ) , & [ ] , |monitor, txdata| {
517
549
// While in practice there shouldn't be any recursive calls when given empty txdata,
518
550
// it's still possible if a chain::Filter implementation returns a transaction.
519
551
debug_assert ! ( txdata. is_empty( ) ) ;
@@ -580,6 +612,7 @@ where C::Target: chain::Filter,
580
612
monitor,
581
613
pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
582
614
channel_perm_failed : AtomicBool :: new ( false ) ,
615
+ last_chain_persist_height : AtomicUsize :: new ( self . highest_chain_height . load ( Ordering :: Acquire ) ) ,
583
616
} ) ;
584
617
persist_res
585
618
}
@@ -636,7 +669,10 @@ where C::Target: chain::Filter,
636
669
let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
637
670
for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
638
671
let is_pending_monitor_update = monitor_state. has_pending_chainsync_updates ( & monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ) ;
639
- if is_pending_monitor_update {
672
+ if is_pending_monitor_update &&
673
+ monitor_state. last_chain_persist_height . load ( Ordering :: Acquire ) + LATENCY_GRACE_PERIOD_BLOCKS as usize
674
+ > self . highest_chain_height . load ( Ordering :: Acquire )
675
+ {
640
676
log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
641
677
} else {
642
678
if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
@@ -650,6 +686,11 @@ where C::Target: chain::Filter,
650
686
// updated.
651
687
log_info ! ( self . logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!" ) ;
652
688
}
689
+ if is_pending_monitor_update {
690
+ log_error ! ( self . logger, "A ChannelMonitor sync took longer than {} blocks to complete." , LATENCY_GRACE_PERIOD_BLOCKS ) ;
691
+ log_error ! ( self . logger, " To avoid funds-loss, we are allowing monitor updates to be released." ) ;
692
+ log_error ! ( self . logger, " This may cause duplicate payment events to be generated." ) ;
693
+ }
653
694
pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
654
695
}
655
696
}
0 commit comments