From 9add75de0c902e113926d197afc2237279f7f339 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Thu, 14 Apr 2022 16:57:06 -0400 Subject: [PATCH 01/10] generic sensei persistence layer --- Cargo.lock | 13 +- Cargo.toml | 12 +- src/chain/bitcoind_client.rs | 10 +- src/disk.rs | 121 +------------------ src/lib/mod.rs | 1 + src/lib/persist.rs | 223 +++++++++++++++++++++++++++++++++++ src/node.rs | 53 ++++----- 7 files changed, 259 insertions(+), 174 deletions(-) create mode 100644 src/lib/persist.rs diff --git a/Cargo.lock b/Cargo.lock index fcecb56..2c6c449 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1075,8 +1075,6 @@ dependencies = [ [[package]] name = "lightning" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580647f97f8e6d138ad724027c8ca9b890b1001b05374c270bbee4c10309b641" dependencies = [ "bitcoin", "secp256k1", @@ -1085,8 +1083,6 @@ dependencies = [ [[package]] name = "lightning-background-processor" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a9c8c4c6e4b9652287d8ce2fc3a168861fdb40eb68793567be5ed77ecce6eb" dependencies = [ "bitcoin", "lightning", @@ -1096,11 +1092,10 @@ dependencies = [ [[package]] name = "lightning-block-sync" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8731c4f20bd4e0d588db6e849ac2114674a112cdfda65354cf9b4cf6018878a" dependencies = [ "bitcoin", "chunked_transfer", + "futures", "lightning", "serde", "serde_json", @@ -1109,8 +1104,6 @@ dependencies = [ [[package]] name = "lightning-invoice" version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f069b6eb46d7639d07977d14dc7d9a40d9d8bc5ac2b47f1924318fec13edd5c0" dependencies = [ "bech32", "bitcoin_hashes", @@ -1122,8 +1115,6 @@ dependencies = [ [[package]] name = "lightning-net-tokio" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85119f898ac097d46c17a0ad7dda0f6ef6b923e5bcb4d1a5e39d33c3c68aa7bc" dependencies = [ "bitcoin", "lightning", @@ -1133,8 +1124,6 @@ dependencies = [ [[package]] name = "lightning-persister" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f8aac01a61b302f3928adf235660c38aa5c246113fc7d19cc4cb60b5f53b7ae" dependencies = [ "bitcoin", "libc", diff --git a/Cargo.toml b/Cargo.toml index b40a2e6..293f3a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,12 @@ name = "senseid" path = "src/main.rs" [dependencies] -lightning = { version = "0.0.106", features = ["max_level_trace"] } -lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ] } -lightning-invoice = { version = "0.14.0" } -lightning-net-tokio = { version = "0.0.106" } -lightning-persister = { version = "0.0.106" } -lightning-background-processor = { version = "0.0.106" } +lightning = { version = "0.0.106", features = ["max_level_trace"], path = "/Users/developer/Development/rust-lightning/lightning" } +lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], path = "/Users/developer/Development/rust-lightning/lightning-block-sync" } +lightning-invoice = { version = "0.14.0", path = "/Users/developer/Development/rust-lightning/lightning-invoice" } +lightning-net-tokio = { version = "0.0.106", path = "/Users/developer/Development/rust-lightning/lightning-net-tokio" } +lightning-persister = { version = "0.0.106", path = "/Users/developer/Development/rust-lightning/lightning-persister" } +lightning-background-processor = { version = "0.0.106", path = "/Users/developer/Development/rust-lightning/lightning-background-processor" } base64 = "0.13.0" bitcoin = "0.27" diff --git a/src/chain/bitcoind_client.rs b/src/chain/bitcoind_client.rs index 351d8e4..73a54f0 100644 --- a/src/chain/bitcoind_client.rs +++ b/src/chain/bitcoind_client.rs @@ -74,29 +74,29 @@ pub enum Target { impl BlockSource for &BitcoindClient { fn get_header<'a>( - &'a mut self, + &'a self, header_hash: &'a BlockHash, height_hint: Option, ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_header(header_hash, height_hint).await }) } fn get_block<'a>( - &'a mut self, + &'a self, header_hash: &'a BlockHash, ) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_block(header_hash).await }) } fn get_best_block(&mut self) -> AsyncBlockSourceResult<(BlockHash, Option)> { Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_best_block().await }) } diff --git a/src/disk.rs b/src/disk.rs index 2acbd71..aed3f3b 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -7,27 +7,10 @@ // You may not use this file except in accordance with one or both of these // licenses. -use crate::chain::broadcaster::SenseiBroadcaster; -use crate::chain::fee_estimator::SenseiFeeEstimator; -use crate::node::{self, ChainMonitor, ChannelManager}; -use bitcoin::secp256k1::key::PublicKey; -use bitcoin::BlockHash; use chrono::Utc; -use lightning::chain::keysinterface::{InMemorySigner, KeysManager}; -use lightning::routing::network_graph::NetworkGraph; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; use lightning::util::logger::{Logger, Record}; -use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; -use lightning_background_processor::Persister; -use lightning_persister::FilesystemPersister; - -use std::collections::HashMap; +use lightning::util::ser::{Writer}; use std::fs; -use std::fs::File; -use std::io::{BufRead, BufReader, BufWriter}; -use std::net::SocketAddr; -use std::path::Path; -use std::sync::Arc; pub struct FilesystemLogger { data_dir: String, @@ -65,105 +48,3 @@ impl Logger for FilesystemLogger { .unwrap(); } } -pub fn persist_channel_peer(path: &Path, peer_info: &str) -> std::io::Result<()> { - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(path)?; - file.write_all(format!("{}\n", peer_info).as_bytes()) -} - -pub fn read_channel_peer_data( - path: &Path, -) -> Result, std::io::Error> { - let mut peer_data = HashMap::new(); - if !Path::new(&path).exists() { - return Ok(HashMap::new()); - } - let file = File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - match node::parse_peer_info(line.unwrap()) { - Ok((pubkey, socket_addr)) => { - peer_data.insert(pubkey, socket_addr); - } - Err(e) => return Err(e), - } - } - Ok(peer_data) -} - -pub fn read_network(path: &Path, genesis_hash: BlockHash) -> NetworkGraph { - if let Ok(file) = File::open(path) { - if let Ok(graph) = NetworkGraph::read(&mut BufReader::new(file)) { - return graph; - } - } - NetworkGraph::new(genesis_hash) -} - -pub fn persist_scorer( - path: &Path, - scorer: &ProbabilisticScorer>, -) -> std::io::Result<()> { - let mut tmp_path = path.to_path_buf().into_os_string(); - tmp_path.push(".tmp"); - let file = fs::OpenOptions::new() - .write(true) - .create(true) - .open(&tmp_path)?; - let write_res = scorer.write(&mut BufWriter::new(file)); - if let Err(e) = write_res.and_then(|_| fs::rename(&tmp_path, path)) { - let _ = fs::remove_file(&tmp_path); - Err(e) - } else { - Ok(()) - } -} - -pub fn read_scorer( - path: &Path, - graph: Arc, -) -> ProbabilisticScorer> { - let params = ProbabilisticScoringParameters::default(); - if let Ok(file) = File::open(path) { - if let Ok(scorer) = - ProbabilisticScorer::read(&mut BufReader::new(file), (params, Arc::clone(&graph))) - { - return scorer; - } - } - ProbabilisticScorer::new(params, graph) -} - -pub struct DataPersister { - pub data_dir: String, - pub external_router: bool, -} - -impl - Persister< - InMemorySigner, - Arc, - Arc, - Arc, - Arc, - Arc, - > for DataPersister -{ - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), std::io::Error> { - FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager) - } - - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> { - if !self.external_router - && FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph) - .is_err() - { - // Persistence errors here are non-fatal as we can just fetch the routing graph - // again later, but they may indicate a disk error which could be fatal elsewhere. - eprintln!("Warning: Failed to persist network graph, check your disk and permissions"); - } - Ok(()) - } -} diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 8a75814..1002096 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -8,3 +8,4 @@ // licenses. pub mod network_graph; +pub mod persist; \ No newline at end of file diff --git a/src/lib/persist.rs b/src/lib/persist.rs new file mode 100644 index 0000000..88f404e --- /dev/null +++ b/src/lib/persist.rs @@ -0,0 +1,223 @@ +use std::{path::{PathBuf, Path}, io::{Cursor}, fs::{self}, ops::Deref, sync::Arc, collections::HashMap, net::SocketAddr}; + +use bitcoin::{BlockHash, Txid, Network, blockdata::constants::genesis_block, hashes::hex::FromHex}; +use lightning::{util::{persist::KVStorePersister, ser::{Writeable, Readable}}, chain::{keysinterface::{Sign, KeysInterface}, channelmonitor::ChannelMonitor}, routing::{network_graph::NetworkGraph, scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}}}; +use lightning_persister::FilesystemPersister; +use bitcoin::secp256k1::key::PublicKey; +use lightning::util::ser::ReadableArgs; + +use crate::node; + + +pub trait KVStoreReader { + fn read(&self, key: &str) -> std::io::Result>>; + fn list(&self, key: &str) -> std::io::Result>; +} + +pub struct FileStore { + filesystem_persister: FilesystemPersister +} + +impl KVStorePersister for FileStore { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + self.filesystem_persister.persist(key, object) + } +} + +impl KVStoreReader for FileStore { + fn read(&self, key: &str) -> std::io::Result>> { + let full_path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); + let path = PathBuf::from(full_path); + match fs::read(path) { + Ok(contents) => Ok(Some(contents)), + Err(_) => Ok(None) + } + } + + fn list(&self, key: &str) -> std::io::Result> { + let path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); + if !Path::new(&PathBuf::from(&path)).exists() { + return Ok(Vec::new()); + } + let mut res = Vec::new(); + for file_option in fs::read_dir(path).unwrap() { + let file = file_option.unwrap(); + let owned_file_name = file.file_name(); + if let Some(filename) = owned_file_name.to_str() { + res.push(filename.to_string()) + } + } + Ok(res) + } +} + +impl FileStore { + pub fn new(root: String) -> Self { + Self { + filesystem_persister: FilesystemPersister::new(root) + } + } +} + +pub enum AnyKVStore { + File(FileStore) +} + +impl KVStorePersister for AnyKVStore { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + match self { + AnyKVStore::File(store) => store.persist(key, object) + } + } +} + +impl KVStoreReader for AnyKVStore { + fn read(&self, key: &str) -> std::io::Result>> { + match self { + AnyKVStore::File(store) => store.read(key) + } + } + + fn list(&self, key: &str) -> std::io::Result> { + match self { + AnyKVStore::File(store) => store.list(key) + } + } +} + +pub struct SenseiPersister { + store: AnyKVStore, + network: Network, +} + +impl SenseiPersister { + pub fn new(store: AnyKVStore, network: Network) -> Self { + Self { store, network } + } + + pub fn read_channel_manager(&self) -> std::io::Result>> { + self.store.read("manager") + } + + pub fn read_network_graph(&self) -> NetworkGraph { + if let Ok(Some(contents)) = self.store.read("network_graph") { + let mut cursor = Cursor::new(contents); + if let Ok(graph) = NetworkGraph::read(&mut cursor) { + return graph; + } + } + + let genesis_hash = genesis_block(self.network).header.block_hash(); + NetworkGraph::new(genesis_hash) + } + + pub fn read_scorer(&self, network_graph: Arc) -> ProbabilisticScorer> { + let params = ProbabilisticScoringParameters::default(); + if let Ok(Some(contents)) = self.store.read("scorer") { + let mut cursor = Cursor::new(contents); + if let Ok(scorer) = ProbabilisticScorer::read(&mut cursor, (params, Arc::clone(&network_graph))) { + return scorer; + } + } + ProbabilisticScorer::new(params, network_graph) + } + + pub fn persist_scorer(&self, scorer: &ProbabilisticScorer>) -> std::io::Result<()> { + self.store.persist("scorer", scorer) + } + + fn get_raw_channel_peer_data(&self) -> String { + if let Ok(Some(contents)) = self.store.read("channel_peer_data") { + if let Ok(channel_peer_data) = String::read(&mut Cursor::new(contents)) { + return channel_peer_data + } + } + + String::new() + } + + pub fn persist_channel_peer(&self, peer_info: &str) -> std::io::Result<()> { + let mut peer_data = self.get_raw_channel_peer_data(); + peer_data.push_str( peer_info); + peer_data.push_str("\n"); + self.store.persist("channel_peer_data", &peer_data) + } + + pub fn read_channel_peer_data(&self) -> Result, std::io::Error> { + let mut peer_data = HashMap::new(); + let raw_peer_data = self.get_raw_channel_peer_data(); + for line in raw_peer_data.lines() { + match node::parse_peer_info(line.to_string()) { + Ok((pubkey, socket_addr)) => { + peer_data.insert(pubkey, socket_addr); + } + Err(e) => return Err(e), + } + } + Ok(peer_data) + } + + /// Read `ChannelMonitor`s from disk. + pub fn read_channelmonitors ( + &self, keys_manager: K + ) -> Result)>, std::io::Error> + where K::Target: KeysInterface + Sized, + { + let filenames = self.store.list("monitors").unwrap(); + + let mut res = Vec::new(); + for filename in filenames { + if !filename.is_ascii() || filename.len() < 65 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid ChannelMonitor file name", + )); + } + if filename.ends_with(".tmp") { + // If we were in the middle of committing an new update and crashed, it should be + // safe to ignore the update - we should never have returned to the caller and + // irrevocably committed to the new state in any way. + continue; + } + + let txid = Txid::from_hex(filename.split_at(64).0); + if txid.is_err() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid tx ID in filename", + )); + } + + let index: Result = filename.split_at(65).1.parse(); + if index.is_err() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid tx index in filename", + )); + } + + let monitor_path = format!("monitors/{}", filename); + let contents = self.store.read(&monitor_path)?.unwrap(); + let mut buffer = Cursor::new(&contents); + match <(BlockHash, ChannelMonitor)>::read(&mut buffer, &*keys_manager) { + Ok((blockhash, channel_monitor)) => { + if channel_monitor.get_funding_txo().0.txid != txid.unwrap() || channel_monitor.get_funding_txo().0.index != index.unwrap() { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "ChannelMonitor was stored in the wrong file")); + } + res.push((blockhash, channel_monitor)); + } + Err(e) => return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to deserialize ChannelMonitor: {}", e), + )) + } + } + Ok(res) + } +} + +impl KVStorePersister for SenseiPersister { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + self.store.persist(key, object) + } +} \ No newline at end of file diff --git a/src/node.rs b/src/node.rs index c8080cf..8ff4fa9 100644 --- a/src/node.rs +++ b/src/node.rs @@ -13,7 +13,7 @@ use crate::chain::listener_database::ListenerDatabase; use crate::chain::manager::SenseiChainManager; use crate::config::LightningNodeConfig; use crate::database::node::NodeDatabase; -use crate::disk::{DataPersister, FilesystemLogger}; +use crate::disk::{FilesystemLogger}; use crate::error::Error; use crate::event_handler::LightningNodeEventHandler; use crate::lib::network_graph::OptionalNetworkGraphMsgHandler; @@ -21,6 +21,7 @@ use crate::services::node::{Channel, NodeInfo, NodeRequest, NodeRequestError, No use crate::services::{PaginationRequest, PaginationResponse, PaymentsFilter}; use crate::utils::PagedVec; use crate::{database, disk, hex_utils}; +use crate::lib::persist::{AnyKVStore, FileStore, SenseiPersister}; use bdk::database::SqliteDatabase; use bdk::keys::ExtendedKey; use bdk::wallet::AddressIndex; @@ -29,6 +30,7 @@ use bitcoin::hashes::Hash; use lightning::chain::channelmonitor::ChannelMonitor; use lightning::ln::msgs::NetAddress; +use lightning::util::persist::Persister; use lightning_invoice::payment::PaymentError; use tindercrypt::cryptors::RingCryptor; @@ -57,13 +59,12 @@ use lightning_background_processor::BackgroundProcessor; use lightning_invoice::utils::DefaultRouter; use lightning_invoice::{payment, utils, Currency, Invoice}; use lightning_net_tokio::SocketDescriptor; -use lightning_persister::FilesystemPersister; use macaroon::Macaroon; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::fs::File; -use std::io::Read; +use std::io::{Read, Cursor}; use std::io::Write; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; use std::path::Path; @@ -145,7 +146,7 @@ pub type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc, Arc, - Arc, + Arc, >; trait MustSized: Sized {} @@ -257,6 +258,7 @@ pub struct LightningNode { pub invoice_payer: Arc, pub scorer: Arc, Instant>>>, pub stop_listen: Arc, + pub persister: Arc } impl LightningNode { @@ -275,7 +277,7 @@ impl LightningNode { if decrypted_seed.len() != 32 { return Err(Error::InvalidSeedLength); } - seed.copy_from_slice(decrypted_seed.as_slice()); + seed.copy_from_slice(decrypted_seed.as_slice()); } None => { thread_rng().fill_bytes(&mut seed); @@ -414,8 +416,8 @@ impl LightningNode { listener_database: listener_database.clone(), }); - let persister = Arc::new(FilesystemPersister::new(data_dir)); - + let persistence_store = FileStore::new(data_dir); + let persister = Arc::new(SenseiPersister::new(AnyKVStore::File(persistence_store), config.network.clone())); let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( @@ -437,7 +439,7 @@ impl LightningNode { let best_block = chain_manager.get_best_block().await?; let (channel_manager_blockhash, channel_manager) = { - if let Ok(mut f) = fs::File::open(channel_manager_path) { + if let Ok(Some(contents)) = persister.read_channel_manager() { let mut channel_monitor_mut_references = Vec::new(); for (_, channel_monitor) in channelmonitors.iter_mut() { channel_monitor_mut_references.push(channel_monitor); @@ -451,7 +453,8 @@ impl LightningNode { user_config, channel_monitor_mut_references, ); - <(BlockHash, ChannelManager)>::read(&mut f, read_args).unwrap() + let mut buffer = Cursor::new(&contents); + <(BlockHash, ChannelManager)>::read(&mut buffer, read_args).unwrap() } else { // TODO: in reality we could error for other reasons when there's supposed to be // an existing chanenl manager. need to handle this the same way we do for seed file @@ -546,14 +549,7 @@ impl LightningNode { let network_graph = match network_graph { Some(network_graph) => network_graph, - None => { - let genesis = genesis_block(config.network).header.block_hash(); - - Arc::new(disk::read_network( - Path::new(&config.network_graph_path()), - genesis, - )) - } + None => Arc::new(persister.read_network_graph()) }; let network_graph_msg_handler: Arc = @@ -593,10 +589,7 @@ impl LightningNode { )); let scorer_path = config.scorer_path(); - let scorer = Arc::new(Mutex::new(disk::read_scorer( - Path::new(&scorer_path), - Arc::clone(&network_graph), - ))); + let scorer = Arc::new(Mutex::new(persister.read_scorer(Arc::clone(&network_graph)))); let router = DefaultRouter::new( network_graph.clone(), @@ -653,6 +646,7 @@ impl LightningNode { scorer, invoice_payer, stop_listen, + persister }) } @@ -687,13 +681,14 @@ impl LightningNode { })); let scorer_path = self.config.scorer_path(); + let scorer_persister = Arc::clone(&self.persister); let scorer_persist = Arc::clone(&self.scorer); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(600)); loop { interval.tick().await; - if disk::persist_scorer(Path::new(&scorer_path), &scorer_persist.lock().unwrap()) + if scorer_persister.persist_scorer(&scorer_persist.lock().unwrap()) .is_err() { // Persistence errors here are non-fatal as channels will be re-scored as payments @@ -703,15 +698,12 @@ impl LightningNode { } })); - let persister = DataPersister { - data_dir: self.config.data_dir(), - external_router: self.config.external_router, - }; + let bg_persister = Arc::clone(&self.persister); // TODO: should we allow 'child' nodes to update NetworkGraph based on payment failures? // feels like probably but depends on exactly what is updated let background_processor = BackgroundProcessor::start( - persister, + bg_persister, self.invoice_payer.clone(), self.chain_monitor.clone(), self.channel_manager.clone(), @@ -725,12 +717,12 @@ impl LightningNode { let channel_peer_data_path = config.channel_peer_data_path(); let channel_manager_reconnect = self.channel_manager.clone(); let peer_manager_reconnect = self.peer_manager.clone(); - + let persister_peer = self.persister.clone(); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { interval.tick().await; - match disk::read_channel_peer_data(Path::new(&channel_peer_data_path)) { + match persister_peer.read_channel_peer_data() { Ok(mut info) => { for (pubkey, peer_addr) in info.drain() { for chan_info in channel_manager_reconnect.list_channels() { @@ -1279,8 +1271,7 @@ impl LightningNode { let res = self.open_channel(pubkey, amt_satoshis, 0, 0, public); if res.is_ok() { - let _ = disk::persist_channel_peer( - Path::new(&self.config.channel_peer_data_path()), + let _ = self.persister.persist_channel_peer( &node_connection_string, ); } From 2d8feb09d0f3da0d05ad3822888070f788478364 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Fri, 15 Apr 2022 13:45:14 -0400 Subject: [PATCH 02/10] chain manager works with trait objects instead of concrete BitcoindClient type --- src/chain/bitcoind_client.rs | 2 +- src/chain/manager.rs | 57 ++++++++++++++++-------------------- src/event_handler.rs | 6 ++-- src/main.rs | 23 ++++++++++++++- src/node.rs | 4 +-- src/services/admin.rs | 4 ++- 6 files changed, 56 insertions(+), 40 deletions(-) diff --git a/src/chain/bitcoind_client.rs b/src/chain/bitcoind_client.rs index 73a54f0..e4bc133 100644 --- a/src/chain/bitcoind_client.rs +++ b/src/chain/bitcoind_client.rs @@ -72,7 +72,7 @@ pub enum Target { HighPriority, } -impl BlockSource for &BitcoindClient { +impl BlockSource for BitcoindClient { fn get_header<'a>( &'a self, header_hash: &'a BlockHash, diff --git a/src/chain/manager.rs b/src/chain/manager.rs index 3e25b1b..a2be9c6 100644 --- a/src/chain/manager.rs +++ b/src/chain/manager.rs @@ -11,50 +11,39 @@ use crate::{ node::{ChainMonitor, ChannelManager}, }; use bitcoin::BlockHash; -use lightning::chain::{BestBlock, Listen}; +use lightning::chain::{BestBlock, Listen, chaininterface::{FeeEstimator, BroadcasterInterface}}; use lightning_block_sync::SpvClient; use lightning_block_sync::{init, poll, UnboundedCache}; use lightning_block_sync::{poll::ValidatedBlockHeader, BlockSource}; use std::ops::Deref; -use super::{ - bitcoind_client::BitcoindClient, listener::SenseiChainListener, - listener_database::ListenerDatabase, -}; +use super::{listener::SenseiChainListener, listener_database::ListenerDatabase }; pub struct SenseiChainManager { config: SenseiConfig, pub listener: Arc, - pub bitcoind_client: Arc, + pub block_source: Arc, + pub fee_estimator: Arc, + pub broadcaster: Arc, poller_paused: Arc, } impl SenseiChainManager { - pub async fn new(config: SenseiConfig) -> Result { - let listener = Arc::new(SenseiChainListener::new()); - - let bitcoind_client = Arc::new( - BitcoindClient::new( - config.bitcoind_rpc_host.clone(), - config.bitcoind_rpc_port, - config.bitcoind_rpc_username.clone(), - config.bitcoind_rpc_password.clone(), - tokio::runtime::Handle::current(), - ) - .await - .expect("invalid bitcoind rpc config"), - ); - - let poller_paused = Arc::new(AtomicBool::new(false)); - - let block_source_poller = bitcoind_client.clone(); + pub async fn new( + config: SenseiConfig, + block_source: Arc, + fee_estimator: Arc, + broadcaster: Arc + ) -> Result { + let listener = Arc::new(SenseiChainListener::new()); + let block_source_poller = block_source.clone(); let listener_poller = listener.clone(); + let poller_paused = Arc::new(AtomicBool::new(false)); let poller_paused_poller = poller_paused.clone(); tokio::spawn(async move { - let derefed = &mut block_source_poller.deref(); let mut cache = UnboundedCache::new(); - let chain_tip = init::validate_best_block_header(derefed).await.unwrap(); - let chain_poller = poll::ChainPoller::new(derefed, config.network); + let chain_tip = init::validate_best_block_header(block_source_poller.clone()).await.unwrap(); + let chain_poller = poll::ChainPoller::new(block_source_poller, config.network); let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, listener_poller); loop { @@ -68,8 +57,10 @@ impl SenseiChainManager { Ok(Self { config, listener, - bitcoind_client, poller_paused, + block_source, + fee_estimator, + broadcaster }) } @@ -78,7 +69,7 @@ impl SenseiChainManager { chain_listeners: Vec<(BlockHash, &(dyn Listen + Send + Sync))>, ) -> Result { let chain_tip = init::synchronize_listeners( - &mut self.bitcoind_client.deref(), + self.block_source.clone(), self.config.network, &mut UnboundedCache::new(), chain_listeners, @@ -121,8 +112,10 @@ impl SenseiChainManager { } pub async fn get_best_block(&self) -> Result { - let mut block_source = self.bitcoind_client.deref(); - let (latest_blockhash, latest_height) = block_source.get_best_block().await.unwrap(); - Ok(BestBlock::new(latest_blockhash, latest_height.unwrap())) + let (latest_blockhash, latest_height) = self.block_source.get_best_block().await.unwrap(); + Ok(BestBlock::new( + latest_blockhash, + latest_height.unwrap(), + )) } } diff --git a/src/event_handler.rs b/src/event_handler.rs index ccf0ccb..3a8b931 100644 --- a/src/event_handler.rs +++ b/src/event_handler.rs @@ -73,7 +73,7 @@ impl EventHandler for LightningNodeEventHandler { let mut tx_builder = wallet.build_tx(); let _fee_sats_per_1000_wu = self .chain_manager - .bitcoind_client + .fee_estimator .get_est_sat_per_1000_weight(ConfirmationTarget::Normal); // TODO: is this the correct conversion?? @@ -273,7 +273,7 @@ impl EventHandler for LightningNodeEventHandler { let tx_feerate = self .chain_manager - .bitcoind_client + .fee_estimator .get_est_sat_per_1000_weight(ConfirmationTarget::Normal); let spending_tx = self @@ -288,7 +288,7 @@ impl EventHandler for LightningNodeEventHandler { .unwrap(); self.chain_manager - .bitcoind_client + .broadcaster .broadcast_transaction(&spending_tx); } Event::ChannelClosed { diff --git a/src/main.rs b/src/main.rs index 4abf014..8c2c25c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,8 @@ mod node; mod services; mod utils; -use crate::config::SenseiConfig; +use crate::chain::bitcoind_client::BitcoindClient; +use crate::{config::SenseiConfig, chain::manager::SenseiChainManager}; use crate::database::admin::AdminDatabase; use crate::http::admin::add_routes as add_admin_routes; use crate::http::node::add_routes as add_node_routes; @@ -159,11 +160,31 @@ async fn main() { let addr = SocketAddr::from(([0, 0, 0, 0], config.api_port)); let node_directory = Arc::new(Mutex::new(HashMap::new())); + let bitcoind_client = Arc::new( + BitcoindClient::new( + config.bitcoind_rpc_host.clone(), + config.bitcoind_rpc_port, + config.bitcoind_rpc_username.clone(), + config.bitcoind_rpc_password.clone(), + tokio::runtime::Handle::current(), + ) + .await + .expect("invalid bitcoind rpc config"), + ); + + let chain_manager = Arc::new(SenseiChainManager::new( + config.clone(), + bitcoind_client.clone(), + bitcoind_client.clone(), + bitcoind_client + ).await.unwrap()); + let admin_service = AdminService::new( &sensei_dir, config.clone(), node_directory.clone(), database, + chain_manager ) .await; diff --git a/src/node.rs b/src/node.rs index 8ff4fa9..8167ad0 100644 --- a/src/node.rs +++ b/src/node.rs @@ -408,11 +408,11 @@ impl LightningNode { let logger = Arc::new(FilesystemLogger::new(data_dir.clone())); let fee_estimator = Arc::new(SenseiFeeEstimator { - fee_estimator: chain_manager.bitcoind_client.clone(), + fee_estimator: chain_manager.fee_estimator.clone() }); let broadcaster = Arc::new(SenseiBroadcaster { - broadcaster: chain_manager.bitcoind_client.clone(), + broadcaster: chain_manager.broadcaster.clone(), listener_database: listener_database.clone(), }); diff --git a/src/services/admin.rs b/src/services/admin.rs index 4fcf680..c58cb3c 100644 --- a/src/services/admin.rs +++ b/src/services/admin.rs @@ -8,6 +8,7 @@ // licenses. use super::{PaginationRequest, PaginationResponse}; +use crate::chain::bitcoind_client::BitcoindClient; use crate::chain::manager::SenseiChainManager; use crate::database::admin::AccessToken; use crate::database::{ @@ -136,13 +137,14 @@ impl AdminService { config: SenseiConfig, node_directory: NodeDirectory, database: AdminDatabase, + chain_manager: Arc ) -> Self { Self { data_dir: String::from(data_dir), config: Arc::new(config.clone()), node_directory, database: Arc::new(Mutex::new(database)), - chain_manager: Arc::new(SenseiChainManager::new(config).await.unwrap()), + chain_manager, } } } From f164e4d7b196f4f87f57e097c40e0a049fe93ba2 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Fri, 15 Apr 2022 17:32:41 -0400 Subject: [PATCH 03/10] remove unused paths --- src/config.rs | 15 --------------- src/node.rs | 4 ---- 2 files changed, 19 deletions(-) diff --git a/src/config.rs b/src/config.rs index 8dac1ec..7537773 100644 --- a/src/config.rs +++ b/src/config.rs @@ -134,19 +134,4 @@ impl LightningNodeConfig { pub fn admin_macaroon_path(&self) -> String { format!("{}/admin.macaroon", self.data_dir()) } - pub fn seed_path(&self) -> String { - format!("{}/seed", self.data_dir()) - } - pub fn channel_manager_path(&self) -> String { - format!("{}/manager", self.data_dir()) - } - pub fn network_graph_path(&self) -> String { - format!("{}/network_graph", self.data_dir()) - } - pub fn scorer_path(&self) -> String { - format!("{}/scorer", self.data_dir()) - } - pub fn channel_peer_data_path(&self) -> String { - format!("{}/channel_peer_data", self.data_dir()) - } } diff --git a/src/node.rs b/src/node.rs index 8167ad0..6aa5201 100644 --- a/src/node.rs +++ b/src/node.rs @@ -371,7 +371,6 @@ impl LightningNode { let mut node_database = NodeDatabase::new(config.node_database_path()); let network = config.network; - let channel_manager_path = config.channel_manager_path(); let admin_macaroon_path = config.admin_macaroon_path(); let seed = @@ -588,7 +587,6 @@ impl LightningNode { Arc::new(IgnoringMessageHandler {}), )); - let scorer_path = config.scorer_path(); let scorer = Arc::new(Mutex::new(persister.read_scorer(Arc::clone(&network_graph)))); let router = DefaultRouter::new( @@ -680,7 +678,6 @@ impl LightningNode { } })); - let scorer_path = self.config.scorer_path(); let scorer_persister = Arc::clone(&self.persister); let scorer_persist = Arc::clone(&self.scorer); @@ -714,7 +711,6 @@ impl LightningNode { // Reconnect to channel peers if possible. - let channel_peer_data_path = config.channel_peer_data_path(); let channel_manager_reconnect = self.channel_manager.clone(); let peer_manager_reconnect = self.peer_manager.clone(); let persister_peer = self.persister.clone(); From 9ec2f9389c7ed5a583834c0529ed7053b2f0ab79 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Sun, 17 Apr 2022 22:26:15 -0400 Subject: [PATCH 04/10] clippy and fmt fixes --- Cargo.lock | 1 - src/chain/bitcoind_client.rs | 21 +- src/chain/manager.rs | 24 ++- src/disk.rs | 2 +- src/event_handler.rs | 5 +- src/lib/mod.rs | 2 +- src/lib/persist.rs | 395 +++++++++++++++++++---------------- src/main.rs | 20 +- src/node.rs | 37 ++-- src/services/admin.rs | 3 +- 10 files changed, 274 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c6c449..f7c3871 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1086,7 +1086,6 @@ version = "0.0.106" dependencies = [ "bitcoin", "lightning", - "lightning-persister", ] [[package]] diff --git a/src/chain/bitcoind_client.rs b/src/chain/bitcoind_client.rs index e4bc133..ed1d68f 100644 --- a/src/chain/bitcoind_client.rs +++ b/src/chain/bitcoind_client.rs @@ -84,17 +84,14 @@ impl BlockSource for BitcoindClient { }) } - fn get_block<'a>( - &'a self, - header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_block(header_hash).await }) } - fn get_best_block(&mut self) -> AsyncBlockSourceResult<(BlockHash, Option)> { + fn get_best_block(&self) -> AsyncBlockSourceResult<(BlockHash, Option)> { Box::pin(async move { let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_best_block().await @@ -116,7 +113,7 @@ impl BitcoindClient { let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); let rpc_credentials = base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); - let mut bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; + let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; let _dummy = bitcoind_rpc_client .call_method::("getblockchaininfo", &[]) .await @@ -153,7 +150,7 @@ impl BitcoindClient { handle.spawn(async move { loop { let background_estimate = { - let mut rpc = rpc_client.lock().await; + let rpc = rpc_client.lock().await; let background_conf_target = serde_json::json!(144); let background_estimate_mode = serde_json::json!("ECONOMICAL"); let resp = rpc @@ -170,7 +167,7 @@ impl BitcoindClient { }; let normal_estimate = { - let mut rpc = rpc_client.lock().await; + let rpc = rpc_client.lock().await; let normal_conf_target = serde_json::json!(18); let normal_estimate_mode = serde_json::json!("ECONOMICAL"); let resp = rpc @@ -187,7 +184,7 @@ impl BitcoindClient { }; let high_prio_estimate = { - let mut rpc = rpc_client.lock().await; + let rpc = rpc_client.lock().await; let high_prio_conf_target = serde_json::json!(6); let high_prio_estimate_mode = serde_json::json!("CONSERVATIVE"); let resp = rpc @@ -229,7 +226,7 @@ impl BitcoindClient { } pub async fn send_raw_transaction(&self, raw_tx: String) { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; let raw_tx_json = serde_json::json!(raw_tx); rpc.call_method::("sendrawtransaction", &[raw_tx_json]) @@ -238,7 +235,7 @@ impl BitcoindClient { } pub async fn get_blockchain_info(&self) -> BlockchainInfo { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.call_method::("getblockchaininfo", &[]) .await .unwrap() @@ -272,7 +269,7 @@ impl BroadcasterInterface for BitcoindClient { let bitcoind_rpc_client = self.bitcoind_rpc_client.clone(); let tx_serialized = serde_json::json!(encode::serialize_hex(tx)); self.handle.spawn(async move { - let mut rpc = bitcoind_rpc_client.lock().await; + let rpc = bitcoind_rpc_client.lock().await; // This may error due to RL calling `broadcast_transaction` with the same transaction // multiple times, but the error is safe to ignore. match rpc diff --git a/src/chain/manager.rs b/src/chain/manager.rs index a2be9c6..f9b8c73 100644 --- a/src/chain/manager.rs +++ b/src/chain/manager.rs @@ -11,13 +11,16 @@ use crate::{ node::{ChainMonitor, ChannelManager}, }; use bitcoin::BlockHash; -use lightning::chain::{BestBlock, Listen, chaininterface::{FeeEstimator, BroadcasterInterface}}; +use lightning::chain::{ + chaininterface::{BroadcasterInterface, FeeEstimator}, + BestBlock, Listen, +}; use lightning_block_sync::SpvClient; use lightning_block_sync::{init, poll, UnboundedCache}; use lightning_block_sync::{poll::ValidatedBlockHeader, BlockSource}; use std::ops::Deref; -use super::{listener::SenseiChainListener, listener_database::ListenerDatabase }; +use super::{listener::SenseiChainListener, listener_database::ListenerDatabase}; pub struct SenseiChainManager { config: SenseiConfig, @@ -30,19 +33,21 @@ pub struct SenseiChainManager { impl SenseiChainManager { pub async fn new( - config: SenseiConfig, + config: SenseiConfig, block_source: Arc, fee_estimator: Arc, - broadcaster: Arc + broadcaster: Arc, ) -> Result { - let listener = Arc::new(SenseiChainListener::new()); + let listener = Arc::new(SenseiChainListener::new()); let block_source_poller = block_source.clone(); let listener_poller = listener.clone(); let poller_paused = Arc::new(AtomicBool::new(false)); let poller_paused_poller = poller_paused.clone(); tokio::spawn(async move { let mut cache = UnboundedCache::new(); - let chain_tip = init::validate_best_block_header(block_source_poller.clone()).await.unwrap(); + let chain_tip = init::validate_best_block_header(block_source_poller.clone()) + .await + .unwrap(); let chain_poller = poll::ChainPoller::new(block_source_poller, config.network); let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, listener_poller); @@ -60,7 +65,7 @@ impl SenseiChainManager { poller_paused, block_source, fee_estimator, - broadcaster + broadcaster, }) } @@ -113,9 +118,6 @@ impl SenseiChainManager { pub async fn get_best_block(&self) -> Result { let (latest_blockhash, latest_height) = self.block_source.get_best_block().await.unwrap(); - Ok(BestBlock::new( - latest_blockhash, - latest_height.unwrap(), - )) + Ok(BestBlock::new(latest_blockhash, latest_height.unwrap())) } } diff --git a/src/disk.rs b/src/disk.rs index aed3f3b..1eac411 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -9,7 +9,7 @@ use chrono::Utc; use lightning::util::logger::{Logger, Record}; -use lightning::util::ser::{Writer}; +use lightning::util::ser::Writer; use std::fs; pub struct FilesystemLogger { diff --git a/src/event_handler.rs b/src/event_handler.rs index 3a8b931..0527344 100644 --- a/src/event_handler.rs +++ b/src/event_handler.rs @@ -19,10 +19,7 @@ use bdk::{FeeRate, SignOptions}; use bitcoin::{secp256k1::Secp256k1, Network}; use bitcoin_bech32::WitnessProgram; use lightning::{ - chain::{ - chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}, - keysinterface::KeysManager, - }, + chain::{chaininterface::ConfirmationTarget, keysinterface::KeysManager}, util::events::{Event, EventHandler, PaymentPurpose}, }; use rand::{thread_rng, Rng}; diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 1002096..47061da 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -8,4 +8,4 @@ // licenses. pub mod network_graph; -pub mod persist; \ No newline at end of file +pub mod persist; diff --git a/src/lib/persist.rs b/src/lib/persist.rs index 88f404e..f4f81c0 100644 --- a/src/lib/persist.rs +++ b/src/lib/persist.rs @@ -1,223 +1,262 @@ -use std::{path::{PathBuf, Path}, io::{Cursor}, fs::{self}, ops::Deref, sync::Arc, collections::HashMap, net::SocketAddr}; +use std::{ + collections::HashMap, + fs::{self}, + io::Cursor, + net::SocketAddr, + ops::Deref, + path::{Path, PathBuf}, + sync::Arc, +}; -use bitcoin::{BlockHash, Txid, Network, blockdata::constants::genesis_block, hashes::hex::FromHex}; -use lightning::{util::{persist::KVStorePersister, ser::{Writeable, Readable}}, chain::{keysinterface::{Sign, KeysInterface}, channelmonitor::ChannelMonitor}, routing::{network_graph::NetworkGraph, scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}}}; -use lightning_persister::FilesystemPersister; use bitcoin::secp256k1::key::PublicKey; +use bitcoin::{ + blockdata::constants::genesis_block, hashes::hex::FromHex, BlockHash, Network, Txid, +}; use lightning::util::ser::ReadableArgs; +use lightning::{ + chain::{ + channelmonitor::ChannelMonitor, + keysinterface::{KeysInterface, Sign}, + }, + routing::{ + network_graph::NetworkGraph, + scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}, + }, + util::{ + persist::KVStorePersister, + ser::{Readable, Writeable}, + }, +}; +use lightning_persister::FilesystemPersister; use crate::node; - pub trait KVStoreReader { - fn read(&self, key: &str) -> std::io::Result>>; - fn list(&self, key: &str) -> std::io::Result>; + fn read(&self, key: &str) -> std::io::Result>>; + fn list(&self, key: &str) -> std::io::Result>; } pub struct FileStore { - filesystem_persister: FilesystemPersister + filesystem_persister: FilesystemPersister, } impl KVStorePersister for FileStore { - fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { - self.filesystem_persister.persist(key, object) - } + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + self.filesystem_persister.persist(key, object) + } } impl KVStoreReader for FileStore { - fn read(&self, key: &str) -> std::io::Result>> { - let full_path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); - let path = PathBuf::from(full_path); - match fs::read(path) { - Ok(contents) => Ok(Some(contents)), - Err(_) => Ok(None) - } - } - - fn list(&self, key: &str) -> std::io::Result> { - let path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); - if !Path::new(&PathBuf::from(&path)).exists() { - return Ok(Vec::new()); - } - let mut res = Vec::new(); - for file_option in fs::read_dir(path).unwrap() { - let file = file_option.unwrap(); - let owned_file_name = file.file_name(); - if let Some(filename) = owned_file_name.to_str() { - res.push(filename.to_string()) - } - } - Ok(res) - } + fn read(&self, key: &str) -> std::io::Result>> { + let full_path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); + let path = PathBuf::from(full_path); + match fs::read(path) { + Ok(contents) => Ok(Some(contents)), + Err(_) => Ok(None), + } + } + + fn list(&self, key: &str) -> std::io::Result> { + let path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); + if !Path::new(&PathBuf::from(&path)).exists() { + return Ok(Vec::new()); + } + let mut res = Vec::new(); + for file_option in fs::read_dir(path).unwrap() { + let file = file_option.unwrap(); + let owned_file_name = file.file_name(); + if let Some(filename) = owned_file_name.to_str() { + res.push(filename.to_string()) + } + } + Ok(res) + } } impl FileStore { - pub fn new(root: String) -> Self { - Self { - filesystem_persister: FilesystemPersister::new(root) + pub fn new(root: String) -> Self { + Self { + filesystem_persister: FilesystemPersister::new(root), + } } - } } pub enum AnyKVStore { - File(FileStore) + File(FileStore), } impl KVStorePersister for AnyKVStore { - fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { - match self { - AnyKVStore::File(store) => store.persist(key, object) + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + match self { + AnyKVStore::File(store) => store.persist(key, object), + } } - } } impl KVStoreReader for AnyKVStore { - fn read(&self, key: &str) -> std::io::Result>> { - match self { - AnyKVStore::File(store) => store.read(key) + fn read(&self, key: &str) -> std::io::Result>> { + match self { + AnyKVStore::File(store) => store.read(key), + } } - } - fn list(&self, key: &str) -> std::io::Result> { - match self { - AnyKVStore::File(store) => store.list(key) + fn list(&self, key: &str) -> std::io::Result> { + match self { + AnyKVStore::File(store) => store.list(key), + } } - } } pub struct SenseiPersister { - store: AnyKVStore, - network: Network, + store: AnyKVStore, + network: Network, } impl SenseiPersister { - pub fn new(store: AnyKVStore, network: Network) -> Self { - Self { store, network } - } - - pub fn read_channel_manager(&self) -> std::io::Result>> { - self.store.read("manager") - } - - pub fn read_network_graph(&self) -> NetworkGraph { - if let Ok(Some(contents)) = self.store.read("network_graph") { - let mut cursor = Cursor::new(contents); - if let Ok(graph) = NetworkGraph::read(&mut cursor) { - return graph; - } - } - - let genesis_hash = genesis_block(self.network).header.block_hash(); - NetworkGraph::new(genesis_hash) - } - - pub fn read_scorer(&self, network_graph: Arc) -> ProbabilisticScorer> { - let params = ProbabilisticScoringParameters::default(); - if let Ok(Some(contents)) = self.store.read("scorer") { - let mut cursor = Cursor::new(contents); - if let Ok(scorer) = ProbabilisticScorer::read(&mut cursor, (params, Arc::clone(&network_graph))) { - return scorer; - } - } - ProbabilisticScorer::new(params, network_graph) - } - - pub fn persist_scorer(&self, scorer: &ProbabilisticScorer>) -> std::io::Result<()> { - self.store.persist("scorer", scorer) - } - - fn get_raw_channel_peer_data(&self) -> String { - if let Ok(Some(contents)) = self.store.read("channel_peer_data") { - if let Ok(channel_peer_data) = String::read(&mut Cursor::new(contents)) { - return channel_peer_data - } - } - - String::new() - } - - pub fn persist_channel_peer(&self, peer_info: &str) -> std::io::Result<()> { - let mut peer_data = self.get_raw_channel_peer_data(); - peer_data.push_str( peer_info); - peer_data.push_str("\n"); - self.store.persist("channel_peer_data", &peer_data) - } - - pub fn read_channel_peer_data(&self) -> Result, std::io::Error> { - let mut peer_data = HashMap::new(); - let raw_peer_data = self.get_raw_channel_peer_data(); - for line in raw_peer_data.lines() { - match node::parse_peer_info(line.to_string()) { - Ok((pubkey, socket_addr)) => { - peer_data.insert(pubkey, socket_addr); + pub fn new(store: AnyKVStore, network: Network) -> Self { + Self { store, network } + } + + pub fn read_channel_manager(&self) -> std::io::Result>> { + self.store.read("manager") + } + + pub fn read_network_graph(&self) -> NetworkGraph { + if let Ok(Some(contents)) = self.store.read("network_graph") { + let mut cursor = Cursor::new(contents); + if let Ok(graph) = NetworkGraph::read(&mut cursor) { + return graph; } - Err(e) => return Err(e), } + + let genesis_hash = genesis_block(self.network).header.block_hash(); + NetworkGraph::new(genesis_hash) + } + + pub fn read_scorer( + &self, + network_graph: Arc, + ) -> ProbabilisticScorer> { + let params = ProbabilisticScoringParameters::default(); + if let Ok(Some(contents)) = self.store.read("scorer") { + let mut cursor = Cursor::new(contents); + if let Ok(scorer) = + ProbabilisticScorer::read(&mut cursor, (params, Arc::clone(&network_graph))) + { + return scorer; + } + } + ProbabilisticScorer::new(params, network_graph) + } + + pub fn persist_scorer( + &self, + scorer: &ProbabilisticScorer>, + ) -> std::io::Result<()> { + self.store.persist("scorer", scorer) + } + + fn get_raw_channel_peer_data(&self) -> String { + if let Ok(Some(contents)) = self.store.read("channel_peer_data") { + if let Ok(channel_peer_data) = String::read(&mut Cursor::new(contents)) { + return channel_peer_data; + } + } + + String::new() + } + + pub fn persist_channel_peer(&self, peer_info: &str) -> std::io::Result<()> { + let mut peer_data = self.get_raw_channel_peer_data(); + peer_data.push_str(peer_info); + peer_data.push_str("\n"); + self.store.persist("channel_peer_data", &peer_data) + } + + pub fn read_channel_peer_data(&self) -> Result, std::io::Error> { + let mut peer_data = HashMap::new(); + let raw_peer_data = self.get_raw_channel_peer_data(); + for line in raw_peer_data.lines() { + match node::parse_peer_info(line.to_string()) { + Ok((pubkey, socket_addr)) => { + peer_data.insert(pubkey, socket_addr); + } + Err(e) => return Err(e), + } + } + Ok(peer_data) + } + + /// Read `ChannelMonitor`s from disk. + pub fn read_channelmonitors( + &self, + keys_manager: K, + ) -> Result)>, std::io::Error> + where + K::Target: KeysInterface + Sized, + { + let filenames = self.store.list("monitors").unwrap(); + + let mut res = Vec::new(); + for filename in filenames { + if !filename.is_ascii() || filename.len() < 65 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid ChannelMonitor file name", + )); + } + if filename.ends_with(".tmp") { + // If we were in the middle of committing an new update and crashed, it should be + // safe to ignore the update - we should never have returned to the caller and + // irrevocably committed to the new state in any way. + continue; + } + + let txid = Txid::from_hex(filename.split_at(64).0); + if txid.is_err() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid tx ID in filename", + )); + } + + let index: Result = filename.split_at(65).1.parse(); + if index.is_err() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid tx index in filename", + )); + } + + let monitor_path = format!("monitors/{}", filename); + let contents = self.store.read(&monitor_path)?.unwrap(); + let mut buffer = Cursor::new(&contents); + match <(BlockHash, ChannelMonitor)>::read(&mut buffer, &*keys_manager) { + Ok((blockhash, channel_monitor)) => { + if channel_monitor.get_funding_txo().0.txid != txid.unwrap() + || channel_monitor.get_funding_txo().0.index != index.unwrap() + { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "ChannelMonitor was stored in the wrong file", + )); + } + res.push((blockhash, channel_monitor)); + } + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to deserialize ChannelMonitor: {}", e), + )) + } + } + } + Ok(res) } - Ok(peer_data) - } - - /// Read `ChannelMonitor`s from disk. - pub fn read_channelmonitors ( - &self, keys_manager: K - ) -> Result)>, std::io::Error> - where K::Target: KeysInterface + Sized, - { - let filenames = self.store.list("monitors").unwrap(); - - let mut res = Vec::new(); - for filename in filenames { - if !filename.is_ascii() || filename.len() < 65 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid ChannelMonitor file name", - )); - } - if filename.ends_with(".tmp") { - // If we were in the middle of committing an new update and crashed, it should be - // safe to ignore the update - we should never have returned to the caller and - // irrevocably committed to the new state in any way. - continue; - } - - let txid = Txid::from_hex(filename.split_at(64).0); - if txid.is_err() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid tx ID in filename", - )); - } - - let index: Result = filename.split_at(65).1.parse(); - if index.is_err() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid tx index in filename", - )); - } - - let monitor_path = format!("monitors/{}", filename); - let contents = self.store.read(&monitor_path)?.unwrap(); - let mut buffer = Cursor::new(&contents); - match <(BlockHash, ChannelMonitor)>::read(&mut buffer, &*keys_manager) { - Ok((blockhash, channel_monitor)) => { - if channel_monitor.get_funding_txo().0.txid != txid.unwrap() || channel_monitor.get_funding_txo().0.index != index.unwrap() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "ChannelMonitor was stored in the wrong file")); - } - res.push((blockhash, channel_monitor)); - } - Err(e) => return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Failed to deserialize ChannelMonitor: {}", e), - )) - } - } - Ok(res) - } } impl KVStorePersister for SenseiPersister { - fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { - self.store.persist(key, object) - } -} \ No newline at end of file + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + self.store.persist(key, object) + } +} diff --git a/src/main.rs b/src/main.rs index 8c2c25c..c5460dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,10 +23,10 @@ mod services; mod utils; use crate::chain::bitcoind_client::BitcoindClient; -use crate::{config::SenseiConfig, chain::manager::SenseiChainManager}; use crate::database::admin::AdminDatabase; use crate::http::admin::add_routes as add_admin_routes; use crate::http::node::add_routes as add_node_routes; +use crate::{chain::manager::SenseiChainManager, config::SenseiConfig}; use ::http::{ header::{self, ACCEPT, AUTHORIZATION, CONTENT_TYPE, COOKIE}, Method, Uri, @@ -172,19 +172,23 @@ async fn main() { .expect("invalid bitcoind rpc config"), ); - let chain_manager = Arc::new(SenseiChainManager::new( - config.clone(), - bitcoind_client.clone(), - bitcoind_client.clone(), - bitcoind_client - ).await.unwrap()); + let chain_manager = Arc::new( + SenseiChainManager::new( + config.clone(), + bitcoind_client.clone(), + bitcoind_client.clone(), + bitcoind_client, + ) + .await + .unwrap(), + ); let admin_service = AdminService::new( &sensei_dir, config.clone(), node_directory.clone(), database, - chain_manager + chain_manager, ) .await; diff --git a/src/node.rs b/src/node.rs index 6aa5201..28a24f2 100644 --- a/src/node.rs +++ b/src/node.rs @@ -13,15 +13,15 @@ use crate::chain::listener_database::ListenerDatabase; use crate::chain::manager::SenseiChainManager; use crate::config::LightningNodeConfig; use crate::database::node::NodeDatabase; -use crate::disk::{FilesystemLogger}; +use crate::disk::FilesystemLogger; use crate::error::Error; use crate::event_handler::LightningNodeEventHandler; use crate::lib::network_graph::OptionalNetworkGraphMsgHandler; +use crate::lib::persist::{AnyKVStore, FileStore, SenseiPersister}; use crate::services::node::{Channel, NodeInfo, NodeRequest, NodeRequestError, NodeResponse, Peer}; use crate::services::{PaginationRequest, PaginationResponse, PaymentsFilter}; use crate::utils::PagedVec; -use crate::{database, disk, hex_utils}; -use crate::lib::persist::{AnyKVStore, FileStore, SenseiPersister}; +use crate::{database, hex_utils}; use bdk::database::SqliteDatabase; use bdk::keys::ExtendedKey; use bdk::wallet::AddressIndex; @@ -30,12 +30,10 @@ use bitcoin::hashes::Hash; use lightning::chain::channelmonitor::ChannelMonitor; use lightning::ln::msgs::NetAddress; -use lightning::util::persist::Persister; use lightning_invoice::payment::PaymentError; use tindercrypt::cryptors::RingCryptor; use bdk::template::DescriptorTemplateOut; -use bitcoin::blockdata::constants::genesis_block; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::network::constants::Network; use bitcoin::secp256k1::PublicKey; @@ -64,10 +62,9 @@ use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::fs::File; -use std::io::{Read, Cursor}; use std::io::Write; +use std::io::{Cursor, Read}; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; -use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -258,7 +255,7 @@ pub struct LightningNode { pub invoice_payer: Arc, pub scorer: Arc, Instant>>>, pub stop_listen: Arc, - pub persister: Arc + pub persister: Arc, } impl LightningNode { @@ -277,7 +274,7 @@ impl LightningNode { if decrypted_seed.len() != 32 { return Err(Error::InvalidSeedLength); } - seed.copy_from_slice(decrypted_seed.as_slice()); + seed.copy_from_slice(decrypted_seed.as_slice()); } None => { thread_rng().fill_bytes(&mut seed); @@ -407,7 +404,7 @@ impl LightningNode { let logger = Arc::new(FilesystemLogger::new(data_dir.clone())); let fee_estimator = Arc::new(SenseiFeeEstimator { - fee_estimator: chain_manager.fee_estimator.clone() + fee_estimator: chain_manager.fee_estimator.clone(), }); let broadcaster = Arc::new(SenseiBroadcaster { @@ -416,7 +413,10 @@ impl LightningNode { }); let persistence_store = FileStore::new(data_dir); - let persister = Arc::new(SenseiPersister::new(AnyKVStore::File(persistence_store), config.network.clone())); + let persister = Arc::new(SenseiPersister::new( + AnyKVStore::File(persistence_store), + config.network.clone(), + )); let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( @@ -548,7 +548,7 @@ impl LightningNode { let network_graph = match network_graph { Some(network_graph) => network_graph, - None => Arc::new(persister.read_network_graph()) + None => Arc::new(persister.read_network_graph()), }; let network_graph_msg_handler: Arc = @@ -587,7 +587,9 @@ impl LightningNode { Arc::new(IgnoringMessageHandler {}), )); - let scorer = Arc::new(Mutex::new(persister.read_scorer(Arc::clone(&network_graph)))); + let scorer = Arc::new(Mutex::new( + persister.read_scorer(Arc::clone(&network_graph)), + )); let router = DefaultRouter::new( network_graph.clone(), @@ -644,7 +646,7 @@ impl LightningNode { scorer, invoice_payer, stop_listen, - persister + persister, }) } @@ -685,7 +687,8 @@ impl LightningNode { let mut interval = tokio::time::interval(Duration::from_secs(600)); loop { interval.tick().await; - if scorer_persister.persist_scorer(&scorer_persist.lock().unwrap()) + if scorer_persister + .persist_scorer(&scorer_persist.lock().unwrap()) .is_err() { // Persistence errors here are non-fatal as channels will be re-scored as payments @@ -1267,9 +1270,7 @@ impl LightningNode { let res = self.open_channel(pubkey, amt_satoshis, 0, 0, public); if res.is_ok() { - let _ = self.persister.persist_channel_peer( - &node_connection_string, - ); + let _ = self.persister.persist_channel_peer(&node_connection_string); } Ok(NodeResponse::OpenChannel {}) diff --git a/src/services/admin.rs b/src/services/admin.rs index c58cb3c..a2851f3 100644 --- a/src/services/admin.rs +++ b/src/services/admin.rs @@ -8,7 +8,6 @@ // licenses. use super::{PaginationRequest, PaginationResponse}; -use crate::chain::bitcoind_client::BitcoindClient; use crate::chain::manager::SenseiChainManager; use crate::database::admin::AccessToken; use crate::database::{ @@ -137,7 +136,7 @@ impl AdminService { config: SenseiConfig, node_directory: NodeDirectory, database: AdminDatabase, - chain_manager: Arc + chain_manager: Arc, ) -> Self { Self { data_dir: String::from(data_dir), From ea5a6134d2f6a419033cfc71de70a0551176e44c Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Tue, 19 Apr 2022 10:31:07 -0400 Subject: [PATCH 05/10] database kv persistence --- Cargo.lock | 6 +++++ Cargo.toml | 12 +++++----- src/config.rs | 10 +++++++++ src/database/mod.rs | 12 ++++++++++ src/database/node.rs | 49 +++++++++++++++++++++++++++++++++++++++++ src/event_handler.rs | 1 + src/lib/persist.rs | 51 ++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 10 +++++++++ src/node.rs | 32 ++++++++++++++++----------- src/services/admin.rs | 3 ++- 10 files changed, 163 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7c3871..1803da9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1075,6 +1075,7 @@ dependencies = [ [[package]] name = "lightning" version = "0.0.106" +source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "secp256k1", @@ -1083,6 +1084,7 @@ dependencies = [ [[package]] name = "lightning-background-processor" version = "0.0.106" +source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "lightning", @@ -1091,6 +1093,7 @@ dependencies = [ [[package]] name = "lightning-block-sync" version = "0.0.106" +source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "chunked_transfer", @@ -1103,6 +1106,7 @@ dependencies = [ [[package]] name = "lightning-invoice" version = "0.14.0" +source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bech32", "bitcoin_hashes", @@ -1114,6 +1118,7 @@ dependencies = [ [[package]] name = "lightning-net-tokio" version = "0.0.106" +source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "lightning", @@ -1123,6 +1128,7 @@ dependencies = [ [[package]] name = "lightning-persister" version = "0.0.106" +source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "libc", diff --git a/Cargo.toml b/Cargo.toml index 293f3a8..f1c9f0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,12 @@ name = "senseid" path = "src/main.rs" [dependencies] -lightning = { version = "0.0.106", features = ["max_level_trace"], path = "/Users/developer/Development/rust-lightning/lightning" } -lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], path = "/Users/developer/Development/rust-lightning/lightning-block-sync" } -lightning-invoice = { version = "0.14.0", path = "/Users/developer/Development/rust-lightning/lightning-invoice" } -lightning-net-tokio = { version = "0.0.106", path = "/Users/developer/Development/rust-lightning/lightning-net-tokio" } -lightning-persister = { version = "0.0.106", path = "/Users/developer/Development/rust-lightning/lightning-persister" } -lightning-background-processor = { version = "0.0.106", path = "/Users/developer/Development/rust-lightning/lightning-background-processor" } +lightning = { version = "0.0.106", features = ["max_level_trace"], git = "https://github.com/lightningdevkit/rust-lightning" } +lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], git = "https://github.com/lightningdevkit/rust-lightning" } +lightning-invoice = { version = "0.14.0", git = "https://github.com/lightningdevkit/rust-lightning" } +lightning-net-tokio = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning" } +lightning-persister = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning" } +lightning-background-processor = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning" } base64 = "0.13.0" bitcoin = "0.27" diff --git a/src/config.rs b/src/config.rs index 7537773..f0932bd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,12 @@ use bitcoin::Network; use serde::{Deserialize, Serialize}; use serde_json::Value; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum KVPersistence { + Filesystem, + Database, +} + #[derive(Clone, Serialize, Deserialize)] pub struct SenseiConfig { #[serde(skip)] @@ -25,6 +31,7 @@ pub struct SenseiConfig { pub api_port: u16, pub port_range_min: u16, pub port_range_max: u16, + pub kv_persistence: KVPersistence, } impl Default for SenseiConfig { @@ -41,6 +48,7 @@ impl Default for SenseiConfig { api_port: 5401, port_range_min: 1024, port_range_max: 65535, + kv_persistence: KVPersistence::Filesystem, } } } @@ -103,6 +111,7 @@ pub struct LightningNodeConfig { pub network: Network, pub passphrase: String, pub external_router: bool, + pub kv_persistence: KVPersistence, } impl Default for LightningNodeConfig { @@ -115,6 +124,7 @@ impl Default for LightningNodeConfig { network: Network::Bitcoin, passphrase: "satoshi".into(), external_router: true, + kv_persistence: KVPersistence::Filesystem, } } } diff --git a/src/database/mod.rs b/src/database/mod.rs index 22bf7e6..471d8a2 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,5 @@ +use std::io::ErrorKind; + // This file is Copyright its original authors, visible in version control // history. // @@ -21,3 +23,13 @@ impl From for Error { Error::Encode(e) } } + +impl From for std::io::Error { + fn from(e: Error) -> std::io::Error { + let error_message = match e { + Error::Generic(str) => str, + Error::Encode(e) => e.to_string(), + }; + std::io::Error::new(ErrorKind::Other, error_message) + } +} diff --git a/src/database/node.rs b/src/database/node.rs index c4f834d..3974382 100644 --- a/src/database/node.rs +++ b/src/database/node.rs @@ -78,6 +78,8 @@ static MIGRATIONS: &[&str] = &[ "CREATE INDEX idx_to_channel_id ON forwarded_payments(to_channel_id)", "CREATE UNIQUE INDEX idx_hours_since_epoch ON forwarded_payments(hours_since_epoch, from_channel_id, to_channel_id)", "CREATE TABLE last_sync (blockhash BLOB)", + "CREATE TABLE kv_store (k TEXT, v BLOB)", + "CREATE UNIQUE INDEX idx_k ON kv_store(k)" ]; pub struct NodeDatabase { @@ -319,6 +321,53 @@ impl NodeDatabase { } } + pub fn set_value(&self, key: String, value: Vec) -> Result<(), Error> { + let mut statement = self.connection.prepare_cached( + " + INSERT INTO kv_store (k, v) + VALUES (:key, :value) + ON CONFLICT + DO UPDATE SET v = excluded.v + ", + )?; + + statement.execute(named_params! { + ":key": key, + ":value": value, + })?; + + Ok(()) + } + + pub fn get_keys(&self, pattern: String) -> Result, Error> { + let mut statement = self + .connection + .prepare_cached("SELECT k FROM kv_store WHERE k LIKE :pattern")?; + + let mut rows = statement.query(named_params! { ":pattern": pattern })?; + let mut keys = vec![]; + while let Some(row) = rows.next()? { + keys.push(row.get(0)?) + } + + Ok(keys) + } + + pub fn get_value(&self, key: String) -> Result>, Error> { + let mut statement = self + .connection + .prepare_cached("SELECT v FROM kv_store WHERE k=:key")?; + + let mut rows = statement.query(named_params! { ":key": key })?; + + let row = rows.next()?; + + match row { + Some(row) => Ok(Some(row.get(0)?)), + None => Ok(None), + } + } + pub fn get_payments( &self, pagination: PaginationRequest, diff --git a/src/event_handler.rs b/src/event_handler.rs index 0527344..67c8b73 100644 --- a/src/event_handler.rs +++ b/src/event_handler.rs @@ -222,6 +222,7 @@ impl EventHandler for LightningNodeEventHandler { Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx, + source_channel_id: _, } => { let from_onchain_str = if *claim_from_onchain_tx { "from onchain downstream claim" diff --git a/src/lib/persist.rs b/src/lib/persist.rs index f4f81c0..c3243db 100644 --- a/src/lib/persist.rs +++ b/src/lib/persist.rs @@ -5,7 +5,7 @@ use std::{ net::SocketAddr, ops::Deref, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Mutex}, }; use bitcoin::secp256k1::key::PublicKey; @@ -29,13 +29,54 @@ use lightning::{ }; use lightning_persister::FilesystemPersister; -use crate::node; +use crate::{database::node::NodeDatabase, node}; pub trait KVStoreReader { fn read(&self, key: &str) -> std::io::Result>>; fn list(&self, key: &str) -> std::io::Result>; } +pub struct DatabaseStore { + database: Arc>, +} + +impl KVStorePersister for DatabaseStore { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + let database = self.database.lock().unwrap(); + database + .set_value(key.to_string(), object.encode()) + .map_err(|e| e.into()) + } +} + +impl KVStoreReader for DatabaseStore { + fn read(&self, key: &str) -> std::io::Result>> { + let database = self.database.lock().unwrap(); + database.get_value(key.to_string()).map_err(|e| e.into()) + } + + fn list(&self, key: &str) -> std::io::Result> { + let pattern = format!("{}%", key); + let database = self.database.lock().unwrap(); + database + .get_keys(pattern) + .map(|full_keys| { + let replace_str = format!("{}/", key); + full_keys + .iter() + .map(|full_key| full_key.replace(&replace_str, "")) + .collect() + }) + .map_err(|e| e.into()) + } +} + +impl DatabaseStore { + pub fn new(database: Arc>) -> Self { + Self { database } + } +} + pub struct FileStore { filesystem_persister: FilesystemPersister, } @@ -83,12 +124,14 @@ impl FileStore { pub enum AnyKVStore { File(FileStore), + Database(DatabaseStore), } impl KVStorePersister for AnyKVStore { fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { match self { AnyKVStore::File(store) => store.persist(key, object), + AnyKVStore::Database(store) => store.persist(key, object), } } } @@ -97,12 +140,14 @@ impl KVStoreReader for AnyKVStore { fn read(&self, key: &str) -> std::io::Result>> { match self { AnyKVStore::File(store) => store.read(key), + AnyKVStore::Database(store) => store.read(key), } } fn list(&self, key: &str) -> std::io::Result> { match self { AnyKVStore::File(store) => store.list(key), + AnyKVStore::Database(store) => store.list(key), } } } @@ -169,7 +214,7 @@ impl SenseiPersister { pub fn persist_channel_peer(&self, peer_info: &str) -> std::io::Result<()> { let mut peer_data = self.get_raw_channel_peer_data(); peer_data.push_str(peer_info); - peer_data.push_str("\n"); + peer_data.push('\n'); self.store.persist("channel_peer_data", &peer_data) } diff --git a/src/main.rs b/src/main.rs index c5460dd..d465a3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,6 +40,7 @@ use axum::{ AddExtensionLayer, Router, }; use clap::Parser; +use config::KVPersistence; use rust_embed::RustEmbed; use std::net::SocketAddr; @@ -98,6 +99,8 @@ struct SenseiArgs { port_range_max: Option, #[clap(long, env = "API_PORT")] api_port: Option, + #[clap(long, env = "KV_PERSISTENCE")] + kv_persistence: Option, } pub type AdminRequestResponse = (AdminRequest, Sender); @@ -152,6 +155,13 @@ async fn main() { if let Some(api_port) = args.api_port { config.api_port = api_port; } + if let Some(kv_persistence) = args.kv_persistence { + config.kv_persistence = match kv_persistence.as_str() { + "filesystem" => KVPersistence::Filesystem, + "database" => KVPersistence::Database, + _ => panic!("invalid kv_persistence value"), + }; + } let sqlite_path = format!("{}/{}/admin.db", sensei_dir, config.network); let mut database = AdminDatabase::new(sqlite_path); diff --git a/src/node.rs b/src/node.rs index 28a24f2..11e095a 100644 --- a/src/node.rs +++ b/src/node.rs @@ -11,13 +11,13 @@ use crate::chain::broadcaster::SenseiBroadcaster; use crate::chain::fee_estimator::SenseiFeeEstimator; use crate::chain::listener_database::ListenerDatabase; use crate::chain::manager::SenseiChainManager; -use crate::config::LightningNodeConfig; +use crate::config::{KVPersistence, LightningNodeConfig}; use crate::database::node::NodeDatabase; use crate::disk::FilesystemLogger; use crate::error::Error; use crate::event_handler::LightningNodeEventHandler; use crate::lib::network_graph::OptionalNetworkGraphMsgHandler; -use crate::lib::persist::{AnyKVStore, FileStore, SenseiPersister}; +use crate::lib::persist::{AnyKVStore, DatabaseStore, FileStore, SenseiPersister}; use crate::services::node::{Channel, NodeInfo, NodeRequest, NodeRequestError, NodeResponse, Peer}; use crate::services::{PaginationRequest, PaginationResponse, PaymentsFilter}; use crate::utils::PagedVec; @@ -289,8 +289,10 @@ impl LightningNode { seed: &[u8], pubkey: String, macaroon_path: String, - database: &mut NodeDatabase, + database: Arc>, ) -> Result { + let mut database = database.lock().unwrap(); + match File::open(macaroon_path.clone()) { Ok(mut file) => { let mut bytes: Vec = Vec::new(); @@ -412,11 +414,15 @@ impl LightningNode { listener_database: listener_database.clone(), }); - let persistence_store = FileStore::new(data_dir); - let persister = Arc::new(SenseiPersister::new( - AnyKVStore::File(persistence_store), - config.network.clone(), - )); + let database = Arc::new(Mutex::new(node_database)); + + let persistence_store = match config.kv_persistence { + KVPersistence::Filesystem => AnyKVStore::File(FileStore::new(data_dir)), + KVPersistence::Database => AnyKVStore::Database(DatabaseStore::new(database.clone())), + }; + + let persister = Arc::new(SenseiPersister::new(persistence_store, config.network)); + let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( @@ -506,8 +512,10 @@ impl LightningNode { chain_listeners.push((block_hash, monitor as &(dyn chain::Listen + Send + Sync))); } - let bdk_database_last_sync = - node_database.find_or_create_last_sync(best_block.block_hash())?; + let bdk_database_last_sync = { + let mut db = database.lock().unwrap(); + db.find_or_create_last_sync(best_block.block_hash())? + }; chain_listeners.push(( bdk_database_last_sync, @@ -603,11 +611,9 @@ impl LightningNode { &seed, pubkey, admin_macaroon_path, - &mut node_database, + database.clone(), )?; - let database = Arc::new(Mutex::new(node_database)); - let event_handler = Arc::new(LightningNodeEventHandler { config: config.clone(), wallet: bdk_wallet.clone(), diff --git a/src/services/admin.rs b/src/services/admin.rs index a2851f3..fa95708 100644 --- a/src/services/admin.rs +++ b/src/services/admin.rs @@ -140,7 +140,7 @@ impl AdminService { ) -> Self { Self { data_dir: String::from(data_dir), - config: Arc::new(config.clone()), + config: Arc::new(config), node_directory, database: Arc::new(Mutex::new(database)), chain_manager, @@ -524,6 +524,7 @@ impl AdminService { network: self.config.network, passphrase, external_router, + kv_persistence: self.config.kv_persistence.clone(), } } From 35f7919ea942bba2e2622d1d643e23ba1fd8c316 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Thu, 21 Apr 2022 07:15:27 -0400 Subject: [PATCH 06/10] remove unused bitcoind client code --- src/chain/bitcoind_client.rs | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/src/chain/bitcoind_client.rs b/src/chain/bitcoind_client.rs index ed1d68f..d831231 100644 --- a/src/chain/bitcoind_client.rs +++ b/src/chain/bitcoind_client.rs @@ -57,10 +57,6 @@ impl TryInto for JsonResponse { } pub struct BitcoindClient { bitcoind_rpc_client: Arc>, - host: String, - port: u16, - rpc_user: String, - rpc_password: String, fees: Arc>, handle: tokio::runtime::Handle, } @@ -127,10 +123,6 @@ impl BitcoindClient { fees.insert(Target::HighPriority, AtomicU32::new(5000)); let client = Self { bitcoind_rpc_client: Arc::new(Mutex::new(bitcoind_rpc_client)), - host, - port, - rpc_user, - rpc_password, fees: Arc::new(fees), handle: handle.clone(), }; @@ -214,32 +206,6 @@ impl BitcoindClient { } }); } - - pub fn get_new_rpc_client(&self) -> std::io::Result { - let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); - let rpc_credentials = base64::encode(format!( - "{}:{}", - self.rpc_user.clone(), - self.rpc_password.clone() - )); - RpcClient::new(&rpc_credentials, http_endpoint) - } - - pub async fn send_raw_transaction(&self, raw_tx: String) { - let rpc = self.bitcoind_rpc_client.lock().await; - - let raw_tx_json = serde_json::json!(raw_tx); - rpc.call_method::("sendrawtransaction", &[raw_tx_json]) - .await - .unwrap(); - } - - pub async fn get_blockchain_info(&self) -> BlockchainInfo { - let rpc = self.bitcoind_rpc_client.lock().await; - rpc.call_method::("getblockchaininfo", &[]) - .await - .unwrap() - } } impl FeeEstimator for BitcoindClient { From 178446816f4231b2b6055f12aa2b1df4a800afa0 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Thu, 21 Apr 2022 10:40:09 -0400 Subject: [PATCH 07/10] fix host mapping by moving it to connection string parser --- src/lib/persist.rs | 4 ++-- src/node.rs | 37 ++++++++++++++++--------------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/lib/persist.rs b/src/lib/persist.rs index c3243db..d9bc665 100644 --- a/src/lib/persist.rs +++ b/src/lib/persist.rs @@ -218,11 +218,11 @@ impl SenseiPersister { self.store.persist("channel_peer_data", &peer_data) } - pub fn read_channel_peer_data(&self) -> Result, std::io::Error> { + pub async fn read_channel_peer_data(&self) -> Result, std::io::Error> { let mut peer_data = HashMap::new(); let raw_peer_data = self.get_raw_channel_peer_data(); for line in raw_peer_data.lines() { - match node::parse_peer_info(line.to_string()) { + match node::parse_peer_info(line.to_string()).await { Ok((pubkey, socket_addr)) => { peer_data.insert(pubkey, socket_addr); } diff --git a/src/node.rs b/src/node.rs index 11e095a..90ff29d 100644 --- a/src/node.rs +++ b/src/node.rs @@ -727,7 +727,7 @@ impl LightningNode { let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { interval.tick().await; - match persister_peer.read_channel_peer_data() { + match persister_peer.read_channel_peer_data().await { Ok(mut info) => { for (pubkey, peer_addr) in info.drain() { for chan_info in channel_manager_reconnect.list_channels() { @@ -852,17 +852,10 @@ impl LightningNode { } pub async fn connect_to_peer(&self, pubkey: PublicKey, addr: SocketAddr) -> Result<(), Error> { - let listen_addr = public_ip::addr().await.unwrap(); - - let connect_address = match listen_addr == addr.ip() { - true => format!("127.0.0.1:{}", addr.port()).parse().unwrap(), - false => addr, - }; - match lightning_net_tokio::connect_outbound( Arc::clone(&self.peer_manager), pubkey, - connect_address, + addr, ) .await { @@ -1261,7 +1254,7 @@ impl LightningNode { amt_satoshis, public, } => { - let (pubkey, addr) = parse_peer_info(node_connection_string.clone())?; + let (pubkey, addr) = parse_peer_info(node_connection_string.clone()).await?; let found_peer = self .peer_manager @@ -1320,7 +1313,7 @@ impl LightningNode { NodeRequest::ConnectPeer { node_connection_string, } => { - let (pubkey, addr) = parse_peer_info(node_connection_string)?; + let (pubkey, addr) = parse_peer_info(node_connection_string).await?; let found_peer = self .peer_manager @@ -1384,7 +1377,7 @@ impl LightningNode { } } -pub fn parse_peer_info( +pub async fn parse_peer_info( peer_pubkey_and_ip_addr: String, ) -> Result<(PublicKey, SocketAddr), std::io::Error> { let mut pubkey_and_addr = peer_pubkey_and_ip_addr.split('@'); @@ -1416,7 +1409,16 @@ pub fn parse_peer_info( )); } - Ok((pubkey.unwrap(), peer_addr.unwrap().unwrap())) + let addr = peer_addr.unwrap().unwrap(); + + let listen_addr = public_ip::addr().await.unwrap(); + + let connect_address = match listen_addr == addr.ip() { + true => format!("127.0.0.1:{}", addr.port()).parse().unwrap(), + false => addr, + }; + + Ok((pubkey.unwrap(), connect_address)) } pub(crate) async fn connect_peer_if_necessary( @@ -1430,14 +1432,7 @@ pub(crate) async fn connect_peer_if_necessary( } } - let listen_addr = public_ip::addr().await.unwrap(); - - let connect_address = match listen_addr == peer_addr.ip() { - true => format!("127.0.0.1:{}", peer_addr.port()).parse().unwrap(), - false => peer_addr, - }; - - match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, connect_address) + match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, peer_addr) .await { Some(connection_closed_future) => { From 08ef4106caf7ce492ad134d35a1fb686113c8a13 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Thu, 21 Apr 2022 11:12:18 -0400 Subject: [PATCH 08/10] fix duplicate utxo issue --- src/chain/listener_database.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/chain/listener_database.rs b/src/chain/listener_database.rs index 376ee50..07459c9 100644 --- a/src/chain/listener_database.rs +++ b/src/chain/listener_database.rs @@ -98,13 +98,17 @@ impl ListenerDatabase { .get_path_from_script_pubkey(&output.script_pubkey) .unwrap() { - database - .set_utxo(&LocalUtxo { - outpoint: OutPoint::new(tx.txid(), i as u32), - txout: output.clone(), - keychain, - }) - .unwrap(); + let outpoint = OutPoint::new(tx.txid(), i as u32); + let existing_utxo = database.get_utxo(&outpoint).unwrap(); + if existing_utxo.is_none() { + database + .set_utxo(&LocalUtxo { + outpoint: OutPoint::new(tx.txid(), i as u32), + txout: output.clone(), + keychain, + }) + .unwrap(); + } incoming += output.value; // TODO: implement this From a64278d58958a9f58e7b2fc8695feba5f70c848a Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Thu, 21 Apr 2022 11:15:49 -0400 Subject: [PATCH 09/10] formatting --- src/lib/persist.rs | 4 +++- src/node.rs | 11 +++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/lib/persist.rs b/src/lib/persist.rs index d9bc665..47df976 100644 --- a/src/lib/persist.rs +++ b/src/lib/persist.rs @@ -218,7 +218,9 @@ impl SenseiPersister { self.store.persist("channel_peer_data", &peer_data) } - pub async fn read_channel_peer_data(&self) -> Result, std::io::Error> { + pub async fn read_channel_peer_data( + &self, + ) -> Result, std::io::Error> { let mut peer_data = HashMap::new(); let raw_peer_data = self.get_raw_channel_peer_data(); for line in raw_peer_data.lines() { diff --git a/src/node.rs b/src/node.rs index 90ff29d..094e1e2 100644 --- a/src/node.rs +++ b/src/node.rs @@ -852,12 +852,8 @@ impl LightningNode { } pub async fn connect_to_peer(&self, pubkey: PublicKey, addr: SocketAddr) -> Result<(), Error> { - match lightning_net_tokio::connect_outbound( - Arc::clone(&self.peer_manager), - pubkey, - addr, - ) - .await + match lightning_net_tokio::connect_outbound(Arc::clone(&self.peer_manager), pubkey, addr) + .await { Some(connection_closed_future) => { let mut connection_closed_future = Box::pin(connection_closed_future); @@ -1432,8 +1428,7 @@ pub(crate) async fn connect_peer_if_necessary( } } - match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, peer_addr) - .await + match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, peer_addr).await { Some(connection_closed_future) => { let mut connection_closed_future = Box::pin(connection_closed_future); From 198b6032c71e2d6deb3db4b513903fa96c48a4d7 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Thu, 21 Apr 2022 11:30:43 -0400 Subject: [PATCH 10/10] fix ldk revision until 107 lands --- Cargo.lock | 12 ++++++------ Cargo.toml | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1803da9..cf43677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1075,7 +1075,7 @@ dependencies = [ [[package]] name = "lightning" version = "0.0.106" -source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "secp256k1", @@ -1084,7 +1084,7 @@ dependencies = [ [[package]] name = "lightning-background-processor" version = "0.0.106" -source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "lightning", @@ -1093,7 +1093,7 @@ dependencies = [ [[package]] name = "lightning-block-sync" version = "0.0.106" -source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "chunked_transfer", @@ -1106,7 +1106,7 @@ dependencies = [ [[package]] name = "lightning-invoice" version = "0.14.0" -source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bech32", "bitcoin_hashes", @@ -1118,7 +1118,7 @@ dependencies = [ [[package]] name = "lightning-net-tokio" version = "0.0.106" -source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "lightning", @@ -1128,7 +1128,7 @@ dependencies = [ [[package]] name = "lightning-persister" version = "0.0.106" -source = "git+https://github.com/lightningdevkit/rust-lightning#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "libc", diff --git a/Cargo.toml b/Cargo.toml index f1c9f0b..4b75f89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,12 @@ name = "senseid" path = "src/main.rs" [dependencies] -lightning = { version = "0.0.106", features = ["max_level_trace"], git = "https://github.com/lightningdevkit/rust-lightning" } -lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], git = "https://github.com/lightningdevkit/rust-lightning" } -lightning-invoice = { version = "0.14.0", git = "https://github.com/lightningdevkit/rust-lightning" } -lightning-net-tokio = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning" } -lightning-persister = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning" } -lightning-background-processor = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning" } +lightning = { version = "0.0.106", features = ["max_level_trace"], git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-invoice = { version = "0.14.0", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-net-tokio = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-persister = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-background-processor = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } base64 = "0.13.0" bitcoin = "0.27"