Skip to content

Commit cc88533

Browse files
committed
Persist NetworkGraph on removal of stale channels
Add comment to log_warn macro
1 parent a29aa23 commit cc88533

File tree

3 files changed

+72
-15
lines changed

3 files changed

+72
-15
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,25 @@ ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
107107
}
108108
}
109109

110+
/// Trait which handles persisting a [`NetworkGraph`] to disk.
111+
///
112+
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
113+
pub trait NetworkGraphPersister {
114+
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed
115+
/// which will cause the [`BackgroundProcessor`] which called this method to exit
116+
///
117+
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
118+
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
119+
}
120+
121+
impl<Fun> NetworkGraphPersister for Fun where
122+
Fun: Fn(&NetworkGraph) -> Result<(), std::io::Error>,
123+
{
124+
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
125+
self(network_graph)
126+
}
127+
}
128+
110129
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
111130
struct DecoratingEventHandler<
112131
E: EventHandler,
@@ -190,8 +209,9 @@ impl BackgroundProcessor {
190209
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
191210
UMH: 'static + Deref + Send + Sync,
192211
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
212+
NGP: 'static + Send + NetworkGraphPersister,
193213
>(
194-
persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM,
214+
channel_manager_persister: CMP, network_graph_persister: NGP, event_handler: EH, chain_monitor: M, channel_manager: CM,
195215
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
196216
) -> Self
197217
where
@@ -234,7 +254,13 @@ impl BackgroundProcessor {
234254

235255
if updates_available {
236256
log_trace!(logger, "Persisting ChannelManager...");
237-
persister.persist_manager(&*channel_manager)?;
257+
channel_manager_persister.persist_manager(&*channel_manager)?;
258+
if let Some(ref handler) = net_graph_msg_handler {
259+
if network_graph_persister.persist_graph(handler.network_graph()).is_err() {
260+
log_warn!(logger, "Warning: Failed to persist network graph, check your disk and permissions");
261+
}
262+
}
263+
238264
log_trace!(logger, "Done persisting ChannelManager.");
239265
}
240266
// Exit the loop if the background processor was requested to stop.
@@ -277,6 +303,9 @@ impl BackgroundProcessor {
277303
if let Some(ref handler) = net_graph_msg_handler {
278304
log_trace!(logger, "Pruning network graph of stale entries");
279305
handler.network_graph().remove_stale_channels();
306+
if network_graph_persister.persist_graph(handler.network_graph()).is_err() {
307+
log_warn!(logger, "Warning: Failed to persist network graph, check your disk and permissions");
308+
}
280309
last_prune_call = Instant::now();
281310
have_pruned = true;
282311
}
@@ -285,7 +314,7 @@ impl BackgroundProcessor {
285314
// After we exit, ensure we persist the ChannelManager one final time - this avoids
286315
// some races where users quit while channel updates were in-flight, with
287316
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
288-
persister.persist_manager(&*channel_manager)
317+
channel_manager_persister.persist_manager(&*channel_manager)
289318
});
290319
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
291320
}
@@ -525,9 +554,11 @@ mod tests {
525554

526555
// Initiate the background processors to watch each node.
527556
let data_dir = nodes[0].persister.get_data_dir();
528-
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
557+
let channel_manager_persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
558+
let data_dir = nodes[0].persister.get_data_dir();
559+
let network_graph_persister = move |graph: &NetworkGraph| FilesystemPersister::persist_network_graph(data_dir.clone(), graph);
529560
let event_handler = |_: &_| {};
530-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
561+
let bg_processor = BackgroundProcessor::start(channel_manager_persister, network_graph_persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
531562

532563
macro_rules! check_persisted_data {
533564
($node: expr, $filepath: expr, $expected_bytes: expr) => {
@@ -556,6 +587,7 @@ mod tests {
556587
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
557588
let mut expected_bytes = Vec::new();
558589
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
590+
559591
loop {
560592
if !nodes[0].node.get_persistence_condvar_value() { break }
561593
}
@@ -570,6 +602,14 @@ mod tests {
570602
if !nodes[0].node.get_persistence_condvar_value() { break }
571603
}
572604

605+
// Check network graph is persisted
606+
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
607+
let mut expected_bytes = Vec::new();
608+
if let Some(handler) = nodes[0].net_graph_msg_handler.clone() {
609+
let network_graph = handler.network_graph();
610+
check_persisted_data!(network_graph, filepath.clone(), expected_bytes);
611+
}
612+
573613
assert!(bg_processor.stop().is_ok());
574614
}
575615

@@ -579,9 +619,11 @@ mod tests {
579619
// `FRESHNESS_TIMER`.
580620
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
581621
let data_dir = nodes[0].persister.get_data_dir();
582-
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
622+
let channel_manager_persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
623+
let data_dir = nodes[0].persister.get_data_dir();
624+
let network_graph_persister = move |graph: &NetworkGraph| FilesystemPersister::persist_network_graph(data_dir.clone(), graph);
583625
let event_handler = |_: &_| {};
584-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
626+
let bg_processor = BackgroundProcessor::start(channel_manager_persister, network_graph_persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
585627
loop {
586628
let log_entries = nodes[0].logger.lines.lock().unwrap();
587629
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -596,14 +638,16 @@ mod tests {
596638
}
597639

598640
#[test]
599-
fn test_persist_error() {
641+
fn test_channel_manager_persist_error() {
600642
// Test that if we encounter an error during manager persistence, the thread panics.
601643
let nodes = create_nodes(2, "test_persist_error".to_string());
602644
open_channel!(nodes[0], nodes[1], 100000);
603645

604-
let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
646+
let channel_manager_persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
647+
let data_dir = nodes[0].persister.get_data_dir();
648+
let network_graph_persister = move |graph: &NetworkGraph| FilesystemPersister::persist_network_graph(data_dir.clone(), graph);
605649
let event_handler = |_: &_| {};
606-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
650+
let bg_processor = BackgroundProcessor::start(channel_manager_persister, network_graph_persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
607651
match bg_processor.join() {
608652
Ok(_) => panic!("Expected error persisting manager"),
609653
Err(e) => {
@@ -618,14 +662,16 @@ mod tests {
618662
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
619663
let channel_value = 100000;
620664
let data_dir = nodes[0].persister.get_data_dir();
621-
let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node);
665+
let channel_manager_persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
666+
let data_dir = nodes[0].persister.get_data_dir();
667+
let network_graph_persister = move |graph: &NetworkGraph| FilesystemPersister::persist_network_graph(data_dir.clone(), graph);
622668

623669
// Set up a background event handler for FundingGenerationReady events.
624670
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
625671
let event_handler = move |event: &Event| {
626672
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
627673
};
628-
let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
674+
let bg_processor = BackgroundProcessor::start(channel_manager_persister.clone(), network_graph_persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
629675

630676
// Open a channel and check that the FundingGenerationReady event was handled.
631677
begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -649,7 +695,7 @@ mod tests {
649695
// Set up a background event handler for SpendableOutputs events.
650696
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
651697
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
652-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
698+
let bg_processor = BackgroundProcessor::start(channel_manager_persister, network_graph_persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
653699

654700
// Force close the channel and check that the SpendableOutputs event was handled.
655701
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
@@ -675,12 +721,14 @@ mod tests {
675721

676722
// Initiate the background processors to watch each node.
677723
let data_dir = nodes[0].persister.get_data_dir();
678-
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
724+
let channel_manager_persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
725+
let data_dir = nodes[0].persister.get_data_dir();
726+
let network_graph_persister = move |graph: &NetworkGraph| FilesystemPersister::persist_network_graph(data_dir.clone(), graph);
679727
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
680728
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
681729
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
682730
let event_handler = Arc::clone(&invoice_payer);
683-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
731+
let bg_processor = BackgroundProcessor::start(channel_manager_persister, network_graph_persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
684732
assert!(bg_processor.stop().is_ok());
685733
}
686734
}

lightning-persister/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ impl FilesystemPersister {
110110
util::write_to_file(path, "manager".to_string(), manager)
111111
}
112112

113+
/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
114+
/// initialization, within a file called "network_graph"
115+
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
116+
let path = PathBuf::from(data_dir);
117+
util::write_to_file(path, "network_graph".to_string(), network_graph)
118+
}
119+
113120
/// Read `ChannelMonitor`s from disk.
114121
pub fn read_channelmonitors<Signer: Sign, K: Deref> (
115122
&self, keys_manager: K

lightning/src/util/macro_logger.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ macro_rules! log_error {
193193
)
194194
}
195195

196+
/// Log a warning
197+
#[macro_export]
196198
macro_rules! log_warn {
197199
($logger: expr, $($arg:tt)*) => (
198200
log_given_level!($logger, $crate::util::logger::Level::Warn, $($arg)*);

0 commit comments

Comments
 (0)