Skip to content

Immutable BlockSource interface #1307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightning-block-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
[dependencies]
bitcoin = "0.27"
lightning = { version = "0.0.106", path = "../lightning" }
futures = { version = "0.3" }
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
Expand Down
6 changes: 3 additions & 3 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use lightning::chain;
/// start when there are no chain listeners to sync yet.
///
/// [`SpvClient`]: crate::SpvClient
pub async fn validate_best_block_header<B: BlockSource>(block_source: &mut B) ->
pub async fn validate_best_block_header<B: BlockSource>(block_source: &B) ->
BlockSourceResult<ValidatedBlockHeader> {
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
block_source
Expand Down Expand Up @@ -67,7 +67,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// C: chain::Filter,
/// P: chainmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// block_source: &B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// config: UserConfig,
/// keys_manager: &K,
Expand Down Expand Up @@ -122,7 +122,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub async fn synchronize_listeners<'a, B: BlockSource, C: Cache, L: chain::Listen + ?Sized>(
block_source: &mut B,
block_source: &B,
network: Network,
header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &'a L)>,
Expand Down
6 changes: 3 additions & 3 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ pub trait BlockSource : Sync + Send {
///
/// Implementations that cannot find headers based on the hash should return a `Transient` error
/// when `height_hint` is `None`.
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;

/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;

/// Returns the hash of the best block and, optionally, its height.
///
/// When polling a block source, [`Poll`] implementations may pass the height to [`get_header`]
/// to allow for a more efficient lookup.
///
/// [`get_header`]: Self::get_header
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
}

/// Result type for `BlockSource` requests.
Expand Down
44 changes: 22 additions & 22 deletions lightning-block-sync/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;

use std::ops::DerefMut;
use std::ops::Deref;

/// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving
/// related chain data. It serves as an adapter for `BlockSource`.
Expand All @@ -17,15 +17,15 @@ use std::ops::DerefMut;
/// [`ChainPoller`]: ../struct.ChainPoller.html
pub trait Poll {
/// Returns a chain tip in terms of its relationship to the provided chain tip.
fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>;

/// Returns the header that preceded the given header in the chain.
fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>;

/// Returns the block associated with the given header.
fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>;
}

Expand Down Expand Up @@ -170,12 +170,12 @@ mod sealed {
///
/// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way
/// of validating chain data and checking consistency.
pub struct ChainPoller<B: DerefMut<Target=T> + Sized, T: BlockSource> {
pub struct ChainPoller<B: Deref<Target=T> + Sized, T: BlockSource> {
block_source: B,
network: Network,
}

impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
impl<B: Deref<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
/// Creates a new poller for the given block source.
///
/// If the `network` parameter is mainnet, then the difficulty between blocks is checked for
Expand All @@ -185,8 +185,8 @@ impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
}
}

impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
impl<B: Deref<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>
{
Box::pin(async move {
Expand All @@ -206,7 +206,7 @@ impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for Chain
})
}

fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>
{
Box::pin(async move {
Expand All @@ -225,7 +225,7 @@ impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for Chain
})
}

fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>
{
Box::pin(async move {
Expand All @@ -249,7 +249,7 @@ mod tests {
let best_known_chain_tip = chain.tip();
chain.disconnect_tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Transient);
Expand All @@ -261,10 +261,10 @@ mod tests {

#[tokio::test]
async fn poll_chain_without_headers() {
let mut chain = Blockchain::default().with_height(1).without_headers();
let chain = Blockchain::default().with_height(1).without_headers();
let best_known_chain_tip = chain.at_height(0);

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -283,7 +283,7 @@ mod tests {
chain.blocks.last_mut().unwrap().header.bits =
BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32]));

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -295,10 +295,10 @@ mod tests {

#[tokio::test]
async fn poll_chain_with_malformed_headers() {
let mut chain = Blockchain::default().with_height(1).malformed_headers();
let chain = Blockchain::default().with_height(1).malformed_headers();
let best_known_chain_tip = chain.at_height(0);

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -310,10 +310,10 @@ mod tests {

#[tokio::test]
async fn poll_chain_with_common_tip() {
let mut chain = Blockchain::default().with_height(0);
let chain = Blockchain::default().with_height(0);
let best_known_chain_tip = chain.tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Common),
Expand All @@ -330,7 +330,7 @@ mod tests {
let worse_chain_tip = chain.tip();
assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork);

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
Expand All @@ -345,7 +345,7 @@ mod tests {
chain.disconnect_tip();
let worse_chain_tip = chain.tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
Expand All @@ -354,12 +354,12 @@ mod tests {

#[tokio::test]
async fn poll_chain_with_better_tip() {
let mut chain = Blockchain::default().with_height(1);
let chain = Blockchain::default().with_height(1);
let best_known_chain_tip = chain.at_height(0);

let better_chain_tip = chain.tip();

let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)),
Expand Down
22 changes: 12 additions & 10 deletions lightning-block-sync/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,51 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;

use futures::lock::Mutex;

use std::convert::TryFrom;
use std::convert::TryInto;

/// A simple REST client for requesting resources using HTTP `GET`.
pub struct RestClient {
endpoint: HttpEndpoint,
client: HttpClient,
client: Mutex<HttpClient>,
}

impl RestClient {
/// Creates a new REST client connected to the given endpoint.
///
/// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = HttpClient::connect(&endpoint)?;
let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self { endpoint, client })
}

/// Requests a resource encoded in `F` format and interpreted as type `T`.
pub async fn request_resource<F, T>(&mut self, resource_path: &str) -> std::io::Result<T>
pub async fn request_resource<F, T>(&self, resource_path: &str) -> std::io::Result<T>
where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
self.client.get::<F>(&uri, &host).await?.try_into()
self.client.lock().await.get::<F>(&uri, &host).await?.try_into()
}
}

impl BlockSource for RestClient {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let resource_path = format!("headers/1/{}.json", header_hash.to_hex());
Ok(self.request_resource::<JsonResponse, _>(&resource_path).await?)
})
}

fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
})
}

fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.request_resource::<JsonResponse, _>("chaininfo.json").await?)
})
Expand Down Expand Up @@ -81,7 +83,7 @@ mod tests {
#[tokio::test]
async fn request_unknown_resource() {
let server = HttpServer::responding_with_not_found();
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();

match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
Expand All @@ -92,7 +94,7 @@ mod tests {
#[tokio::test]
async fn request_malformed_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content("foo"));
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();

match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
Expand All @@ -103,7 +105,7 @@ mod tests {
#[tokio::test]
async fn request_valid_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content(42));
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();

match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Expand Down
Loading