Skip to content

Persist NetworkGraph on removal of stale channels #1376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 122 additions & 45 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ const PING_TIMER: u64 = 1;
/// Prune the network graph of stale entries hourly.
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;

/// Trait which handles persisting a [`ChannelManager`] to disk.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
pub trait ChannelManagerPersister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
#[cfg(not(test))]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;

/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
Expand All @@ -87,24 +90,11 @@ where
L::Target: 'static + Logger,
{
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
/// (which will cause the [`BackgroundProcessor`] which called this method to exit.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// (which will cause the [`BackgroundProcessor`] which called this method to exit).
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
}

impl<Fun, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
Fun: Fn(&ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>,
{
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
self(channel_manager)
}
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
}

/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
Expand Down Expand Up @@ -141,17 +131,21 @@ impl BackgroundProcessor {
/// documentation].
///
/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
/// `persist_manager` returns an error. In case of an error, the error is retrieved by calling
/// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
/// either [`join`] or [`stop`].
///
/// # Data Persistence
///
/// `persist_manager` is responsible for writing out the [`ChannelManager`] to disk, and/or
/// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
/// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
/// provided implementation.
///
/// Typically, users should either implement [`ChannelManagerPersister`] to never return an
/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
/// for Rust-Lightning's provided implementation.
///
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
/// error or call [`join`] and handle any error that may arise. For the latter case,
/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
///
Expand All @@ -168,7 +162,9 @@ impl BackgroundProcessor {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
/// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
pub fn start<
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
Expand All @@ -184,14 +180,14 @@ impl BackgroundProcessor {
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
>(
persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM,
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
) -> Self
where
Expand Down Expand Up @@ -273,19 +269,29 @@ impl BackgroundProcessor {
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
if let Some(ref handler) = net_graph_msg_handler {
log_trace!(logger, "Pruning network graph of stale entries");
handler.network_graph().remove_stale_channels();
if let Err(e) = persister.persist_graph(handler.network_graph()) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}
last_prune_call = Instant::now();
have_pruned = true;
}
}
}

// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
persister.persist_manager(&*channel_manager)
persister.persist_manager(&*channel_manager)?;

// Persist NetworkGraph on exit
if let Some(ref handler) = net_graph_msg_handler {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do this after the channel manager - the manager is much more important, and if the network graph fails to be persisted cause the user kills the process during shutdown its not a big deal.

persister.persist_graph(handler.network_graph())?;
}
Ok(())
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down Expand Up @@ -343,9 +349,10 @@ mod tests {
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::network::constants::Network;
use lightning::chain::{BestBlock, Confirm, chainmonitor};
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::{BestBlock, Confirm, chainmonitor, self};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager, Sign};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
Expand All @@ -355,12 +362,14 @@ mod tests {
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
use lightning_invoice::utils::DefaultRouter;
use lightning_persister::FilesystemPersister;
use std::fs;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -402,6 +411,48 @@ mod tests {
}
}

struct Persister {
data_dir: String,
graph_error: Option<(std::io::ErrorKind, &'static str)>,
manager_error: Option<(std::io::ErrorKind, &'static str)>
}

impl Persister {
fn new(data_dir: String) -> Self {
Self { data_dir, graph_error: None, manager_error: None }
}

fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { graph_error: Some((error, message)), ..self }
}

fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { manager_error: Some((error, message)), ..self }
}
}

impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
{
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
match self.manager_error {
None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager),
Some((error, message)) => Err(std::io::Error::new(error, message)),
}
}

fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
match self.graph_error {
None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph),
Some((error, message)) => Err(std::io::Error::new(error, message)),
}
}
}

fn get_full_filepath(filepath: String, filename: String) -> String {
let mut path = PathBuf::from(filepath);
path.push(filename);
Expand Down Expand Up @@ -525,19 +576,20 @@ mod tests {

// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let persister = Persister::new(data_dir);
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());

macro_rules! check_persisted_data {
($node: expr, $filepath: expr, $expected_bytes: expr) => {
($node: expr, $filepath: expr) => {
let mut expected_bytes = Vec::new();
loop {
$expected_bytes.clear();
match $node.write(&mut $expected_bytes) {
expected_bytes.clear();
match $node.write(&mut expected_bytes) {
Ok(()) => {
match std::fs::read($filepath) {
Ok(bytes) => {
if bytes == $expected_bytes {
if bytes == expected_bytes {
break
} else {
continue
Expand All @@ -554,8 +606,8 @@ mod tests {

// Check that the initial channel manager data is persisted as expected.
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
check_persisted_data!(nodes[0].node, filepath.clone());

loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}
Expand All @@ -564,12 +616,18 @@ mod tests {
nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap();

// Check that the force-close updates are persisted.
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
check_persisted_data!(nodes[0].node, filepath.clone());
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}

// Check network graph is persisted
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
if let Some(ref handler) = nodes[0].net_graph_msg_handler {
let network_graph = handler.network_graph();
check_persisted_data!(network_graph, filepath.clone());
}

assert!(bg_processor.stop().is_ok());
}

Expand All @@ -579,7 +637,7 @@ mod tests {
// `FRESHNESS_TIMER`.
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let persister = Persister::new(data_dir);
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
loop {
Expand All @@ -596,12 +654,13 @@ mod tests {
}

#[test]
fn test_persist_error() {
fn test_channel_manager_persist_error() {
// Test that if we encounter an error during manager persistence, the thread panics.
let nodes = create_nodes(2, "test_persist_error".to_string());
open_channel!(nodes[0], nodes[1], 100000);

let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
let data_dir = nodes[0].persister.get_data_dir();
let persister = Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test");
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
match bg_processor.join() {
Expand All @@ -613,19 +672,37 @@ mod tests {
}
}

#[test]
fn test_network_graph_persist_error() {
// Test that if we encounter an error during network graph persistence, an error gets returned.
let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test");
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());

match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting network graph"),
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::Other);
assert_eq!(e.get_ref().unwrap().to_string(), "test");
},
}
}

#[test]
fn test_background_event_handling() {
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
let channel_value = 100000;
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node);
let persister = Persister::new(data_dir.clone());

// Set up a background event handler for FundingGenerationReady events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| {
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
};
let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());

// Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value);
Expand All @@ -649,7 +726,7 @@ mod tests {
// Set up a background event handler for SpendableOutputs events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(Persister::new(data_dir), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());

// Force close the channel and check that the SpendableOutputs event was handled.
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
Expand All @@ -675,7 +752,7 @@ mod tests {

// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let persister = Persister::new(data_dir);
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
Expand Down
14 changes: 14 additions & 0 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extern crate libc;

use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::hashes::hex::{FromHex, ToHex};
use lightning::routing::network_graph::NetworkGraph;
use crate::util::DiskWriteable;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
Expand Down Expand Up @@ -66,6 +67,12 @@ where
}
}

impl DiskWriteable for NetworkGraph {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
self.write(writer)
}
}

impl FilesystemPersister {
/// Initialize a new FilesystemPersister and set the path to the individual channels'
/// files.
Expand Down Expand Up @@ -103,6 +110,13 @@ impl FilesystemPersister {
util::write_to_file(path, "manager".to_string(), manager)
}

/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
/// initialization, within a file called "network_graph"
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
let path = PathBuf::from(data_dir);
util::write_to_file(path, "network_graph".to_string(), network_graph)
}

/// Read `ChannelMonitor`s from disk.
pub fn read_channelmonitors<Signer: Sign, K: Deref> (
&self, keys_manager: K
Expand Down