@@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid;
29
29
use chain;
30
30
use chain:: { ChannelMonitorUpdateErr , Filter , WatchedOutput } ;
31
31
use chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
32
- use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs } ;
32
+ use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , LATENCY_GRACE_PERIOD_BLOCKS } ;
33
33
use chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use chain:: keysinterface:: Sign ;
35
35
use util:: atomic_counter:: AtomicCounter ;
@@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails;
42
42
use prelude:: * ;
43
43
use sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
44
44
use core:: ops:: Deref ;
45
- use core:: sync:: atomic:: { AtomicBool , Ordering } ;
45
+ use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
46
46
47
47
#[ derive( Clone , Copy , Hash , PartialEq , Eq ) ]
48
48
/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
@@ -168,6 +168,13 @@ struct MonitorHolder<ChannelSigner: Sign> {
168
168
/// processed the closure event, we set this to true and return PermanentFailure for any other
169
169
/// chain::Watch events.
170
170
channel_perm_failed : AtomicBool ,
171
+ /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
172
+ /// in `pending_monitor_updates`.
173
+ /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
174
+ /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
175
+ /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
176
+ /// forever, risking funds loss.
177
+ last_chain_persist_height : AtomicUsize ,
171
178
}
172
179
173
180
impl < ChannelSigner : Sign > MonitorHolder < ChannelSigner > {
@@ -226,6 +233,8 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
226
233
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
227
234
/// from the user and not from a [`ChannelMonitor`].
228
235
pending_monitor_events : Mutex < Vec < MonitorEvent > > ,
236
+ /// The best block height seen, used as a proxy for the passage of time.
237
+ highest_chain_height : AtomicUsize ,
229
238
}
230
239
231
240
impl < ChannelSigner : Sign , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref > ChainMonitor < ChannelSigner , C , T , F , L , P >
@@ -244,11 +253,16 @@ where C::Target: chain::Filter,
244
253
/// calls must not exclude any transactions matching the new outputs nor any in-block
245
254
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
246
255
/// updated `txdata`.
247
- fn process_chain_data < FN > ( & self , header : & BlockHeader , txdata : & TransactionData , process : FN )
256
+ ///
257
+ /// Calls which represent a new blockchain tip height should set `best_height`.
258
+ fn process_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN )
248
259
where
249
260
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
250
261
{
251
262
let mut dependent_txdata = Vec :: new ( ) ;
263
+ if let Some ( height) = best_height {
264
+ self . highest_chain_height . store ( height as usize , Ordering :: Release ) ;
265
+ }
252
266
{
253
267
let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
254
268
for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
@@ -260,6 +274,14 @@ where C::Target: chain::Filter,
260
274
contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
261
275
} ;
262
276
let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
277
+ if let Some ( height) = best_height {
278
+ if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
279
+ // If there are not ChainSync persists awaiting completion, go ahead and
280
+ // set last_chain_persist_height here - we wouldn't want the first
281
+ // TemporaryFailure to always immediately be considered "overly delayed".
282
+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
283
+ }
284
+ }
263
285
264
286
log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
265
287
match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
@@ -304,7 +326,7 @@ where C::Target: chain::Filter,
304
326
dependent_txdata. sort_unstable_by_key ( |( index, _tx) | * index) ;
305
327
dependent_txdata. dedup_by_key ( |( index, _tx) | * index) ;
306
328
let txdata: Vec < _ > = dependent_txdata. iter ( ) . map ( |( index, tx) | ( * index, tx) ) . collect ( ) ;
307
- self . process_chain_data ( header, & txdata, process) ;
329
+ self . process_chain_data ( header, None , & txdata, process) ; // We skip the best height the second go-around
308
330
}
309
331
}
310
332
@@ -325,6 +347,7 @@ where C::Target: chain::Filter,
325
347
fee_estimator : feeest,
326
348
persister,
327
349
pending_monitor_events : Mutex :: new ( Vec :: new ( ) ) ,
350
+ highest_chain_height : AtomicUsize :: new ( 0 ) ,
328
351
}
329
352
}
330
353
@@ -428,9 +451,11 @@ where C::Target: chain::Filter,
428
451
} ) ;
429
452
} ,
430
453
MonitorUpdateId { contents : UpdateOrigin :: ChainSync ( _) } => {
431
- // We've already done everything we need to, the next time
432
- // release_pending_monitor_events is called, any events for this ChannelMonitor
433
- // will be returned if there's no more SyncPersistId events left.
454
+ if !monitor_data. has_pending_chainsync_updates ( & pending_monitor_updates) {
455
+ monitor_data. last_chain_persist_height . store ( self . highest_chain_height . load ( Ordering :: Acquire ) , Ordering :: Release ) ;
456
+ // The next time release_pending_monitor_events is called, any events for this
457
+ // ChannelMonitor will be returned.
458
+ }
434
459
} ,
435
460
}
436
461
Ok ( ( ) )
@@ -470,7 +495,7 @@ where
470
495
let header = & block. header ;
471
496
let txdata: Vec < _ > = block. txdata . iter ( ) . enumerate ( ) . collect ( ) ;
472
497
log_debug ! ( self . logger, "New best block {} at height {} provided via block_connected" , header. block_hash( ) , height) ;
473
- self . process_chain_data ( header, & txdata, |monitor, txdata| {
498
+ self . process_chain_data ( header, Some ( height ) , & txdata, |monitor, txdata| {
474
499
monitor. block_connected (
475
500
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
476
501
} ) ;
@@ -497,7 +522,7 @@ where
497
522
{
498
523
fn transactions_confirmed ( & self , header : & BlockHeader , txdata : & TransactionData , height : u32 ) {
499
524
log_debug ! ( self . logger, "{} provided transactions confirmed at height {} in block {}" , txdata. len( ) , height, header. block_hash( ) ) ;
500
- self . process_chain_data ( header, txdata, |monitor, txdata| {
525
+ self . process_chain_data ( header, None , txdata, |monitor, txdata| {
501
526
monitor. transactions_confirmed (
502
527
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
503
528
} ) ;
@@ -513,7 +538,7 @@ where
513
538
514
539
fn best_block_updated ( & self , header : & BlockHeader , height : u32 ) {
515
540
log_debug ! ( self . logger, "New best block {} at height {} provided via best_block_updated" , header. block_hash( ) , height) ;
516
- self . process_chain_data ( header, & [ ] , |monitor, txdata| {
541
+ self . process_chain_data ( header, Some ( height ) , & [ ] , |monitor, txdata| {
517
542
// While in practice there shouldn't be any recursive calls when given empty txdata,
518
543
// it's still possible if a chain::Filter implementation returns a transaction.
519
544
debug_assert ! ( txdata. is_empty( ) ) ;
@@ -580,6 +605,7 @@ where C::Target: chain::Filter,
580
605
monitor,
581
606
pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
582
607
channel_perm_failed : AtomicBool :: new ( false ) ,
608
+ last_chain_persist_height : AtomicUsize :: new ( self . highest_chain_height . load ( Ordering :: Acquire ) ) ,
583
609
} ) ;
584
610
persist_res
585
611
}
@@ -636,7 +662,10 @@ where C::Target: chain::Filter,
636
662
let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
637
663
for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
638
664
let is_pending_monitor_update = monitor_state. has_pending_chainsync_updates ( & monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ) ;
639
- if is_pending_monitor_update {
665
+ if is_pending_monitor_update &&
666
+ monitor_state. last_chain_persist_height . load ( Ordering :: Acquire ) + LATENCY_GRACE_PERIOD_BLOCKS as usize
667
+ > self . highest_chain_height . load ( Ordering :: Acquire )
668
+ {
640
669
log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
641
670
} else {
642
671
if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
@@ -650,6 +679,11 @@ where C::Target: chain::Filter,
650
679
// updated.
651
680
log_info ! ( self . logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!" ) ;
652
681
}
682
+ if is_pending_monitor_update {
683
+ log_error ! ( self . logger, "A ChannelMonitor sync took longer than {} blocks to complete." , LATENCY_GRACE_PERIOD_BLOCKS ) ;
684
+ log_error ! ( self . logger, " To avoid funds-loss, we are allowing monitor updates to be released." ) ;
685
+ log_error ! ( self . logger, " This may cause duplicate payment events to be generated." ) ;
686
+ }
653
687
pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
654
688
}
655
689
}
0 commit comments