Skip to content

Commit 9df0066

Browse files
committed
Add ChainNotifier and define ChainListener trait
Add an interface for being notified of block connected and disconnected events, along with a notifier for generating such events. Used while polling block sources for a new tip in order to feed these events into ChannelManager and ChainMonitor.
1 parent f0fde06 commit 9df0066

File tree

3 files changed

+421
-4
lines changed

3 files changed

+421
-4
lines changed

lightning-block-sync/src/lib.rs

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ mod test_utils;
3131
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
3232
mod utils;
3333

34+
use crate::poll::{Poll, ValidatedBlockHeader};
35+
3436
use bitcoin::blockdata::block::{Block, BlockHeader};
3537
use bitcoin::hash_types::BlockHash;
3638
use bitcoin::util::uint::Uint256;
@@ -130,3 +132,319 @@ pub struct BlockHeaderData {
130132
/// of equivalent weight.
131133
pub chainwork: Uint256,
132134
}
135+
136+
/// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
137+
///
138+
/// Used when needing to replay chain data upon startup or as new chain events occur.
139+
pub trait ChainListener {
140+
/// Notifies the listener that a block was added at the given height.
141+
fn block_connected(&mut self, block: &Block, height: u32);
142+
143+
/// Notifies the listener that a block was removed at the given height.
144+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32);
145+
}
146+
147+
/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
148+
/// keyed by block hash.
149+
///
150+
/// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring
151+
/// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if
152+
/// the block source does not store stale forks indefinitely).
153+
///
154+
/// Implementations may define how long to retain headers such that it's unlikely they will ever be
155+
/// needed to disconnect a block. In cases where block sources provide access to headers on stale
156+
/// forks reliably, caches may be entirely unnecessary.
157+
///
158+
/// [`ChainNotifier`]: struct.ChainNotifier.html
159+
pub trait Cache {
160+
/// Retrieves the block header keyed by the given block hash.
161+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;
162+
163+
/// Called when a block has been connected to the best chain to ensure it is available to be
164+
/// disconnected later if needed.
165+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);
166+
167+
/// Called when a block has been disconnected from the best chain. Once disconnected, a block's
168+
/// header is no longer needed and thus can be removed.
169+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>;
170+
}
171+
172+
/// Unbounded cache of block headers keyed by block hash.
173+
pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
174+
175+
impl Cache for UnboundedCache {
176+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
177+
self.get(block_hash)
178+
}
179+
180+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
181+
self.insert(block_hash, block_header);
182+
}
183+
184+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
185+
self.remove(block_hash)
186+
}
187+
}
188+
189+
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
190+
///
191+
/// [listeners]: trait.ChainListener.html
192+
struct ChainNotifier<C: Cache> {
193+
/// Cache for looking up headers before fetching from a block source.
194+
header_cache: C,
195+
}
196+
197+
/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
198+
/// to another.
199+
///
200+
/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
201+
/// before new blocks are connected in reverse order.
202+
struct ChainDifference {
203+
/// Blocks that were disconnected from the chain since the last poll.
204+
disconnected_blocks: Vec<ValidatedBlockHeader>,
205+
206+
/// Blocks that were connected to the chain since the last poll.
207+
connected_blocks: Vec<ValidatedBlockHeader>,
208+
}
209+
210+
impl<C: Cache> ChainNotifier<C> {
211+
/// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from
212+
/// `old_header` to get to that point and then connecting blocks until `new_header`.
213+
///
214+
/// Validates headers along the transition path, but doesn't fetch blocks until the chain is
215+
/// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
216+
/// ended up which may not be `new_header`. Note that iff the returned `Err` contains `Some`
217+
/// header then the transition from `old_header` to `new_header` is valid.
218+
async fn sync_listener<L: ChainListener, P: Poll>(
219+
&mut self,
220+
new_header: ValidatedBlockHeader,
221+
old_header: &ValidatedBlockHeader,
222+
chain_poller: &mut P,
223+
chain_listener: &mut L,
224+
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
225+
let mut difference = self.find_difference(new_header, old_header, chain_poller).await
226+
.map_err(|e| (e, None))?;
227+
228+
let mut new_tip = *old_header;
229+
for header in difference.disconnected_blocks.drain(..) {
230+
println!("Disconnecting block {}", header.block_hash);
231+
if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) {
232+
assert_eq!(cached_header, header);
233+
}
234+
chain_listener.block_disconnected(&header.header, header.height);
235+
new_tip = header;
236+
}
237+
238+
for header in difference.connected_blocks.drain(..).rev() {
239+
let block = chain_poller
240+
.fetch_block(&header).await
241+
.or_else(|e| Err((e, Some(new_tip))))?;
242+
debug_assert_eq!(block.block_hash, header.block_hash);
243+
244+
println!("Connecting block {}", header.block_hash);
245+
self.header_cache.block_connected(header.block_hash, header);
246+
chain_listener.block_connected(&block, header.height);
247+
new_tip = header;
248+
}
249+
250+
Ok(())
251+
}
252+
253+
/// Returns the changes needed to produce the chain with `current_header` as its tip from the
254+
/// chain with `prev_header` as its tip.
255+
///
256+
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
257+
async fn find_difference<P: Poll>(
258+
&self,
259+
current_header: ValidatedBlockHeader,
260+
prev_header: &ValidatedBlockHeader,
261+
chain_poller: &mut P,
262+
) -> BlockSourceResult<ChainDifference> {
263+
let mut disconnected_blocks = Vec::new();
264+
let mut connected_blocks = Vec::new();
265+
let mut current = current_header;
266+
let mut previous = *prev_header;
267+
loop {
268+
// Found the common ancestor.
269+
if current.block_hash == previous.block_hash {
270+
break;
271+
}
272+
273+
// Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
274+
// the header with the greater height, or both if equal heights.
275+
let current_height = current.height;
276+
let previous_height = previous.height;
277+
if current_height <= previous_height {
278+
disconnected_blocks.push(previous);
279+
previous = self.look_up_previous_header(chain_poller, &previous).await?;
280+
}
281+
if current_height >= previous_height {
282+
connected_blocks.push(current);
283+
current = self.look_up_previous_header(chain_poller, &current).await?;
284+
}
285+
}
286+
287+
Ok(ChainDifference { disconnected_blocks, connected_blocks })
288+
}
289+
290+
/// Returns the previous header for the given header, either by looking it up in the cache or
291+
/// fetching it if not found.
292+
async fn look_up_previous_header<P: Poll>(
293+
&self,
294+
chain_poller: &mut P,
295+
header: &ValidatedBlockHeader,
296+
) -> BlockSourceResult<ValidatedBlockHeader> {
297+
match self.header_cache.look_up(&header.header.prev_blockhash) {
298+
Some(prev_header) => Ok(*prev_header),
299+
None => chain_poller.look_up_previous_header(header).await,
300+
}
301+
}
302+
}
303+
304+
#[cfg(test)]
305+
mod chain_notifier_tests {
306+
use crate::test_utils::{Blockchain, MockChainListener};
307+
use super::*;
308+
309+
use bitcoin::network::constants::Network;
310+
311+
#[tokio::test]
312+
async fn sync_from_same_chain() {
313+
let mut chain = Blockchain::default().with_height(3);
314+
315+
let new_tip = chain.tip();
316+
let old_tip = chain.at_height(1);
317+
let mut listener = MockChainListener::new()
318+
.expect_block_connected(*chain.at_height(2))
319+
.expect_block_connected(*new_tip);
320+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) };
321+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
322+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
323+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
324+
Ok(_) => {},
325+
}
326+
}
327+
328+
#[tokio::test]
329+
async fn sync_from_different_chains() {
330+
let mut test_chain = Blockchain::with_network(Network::Testnet).with_height(1);
331+
let main_chain = Blockchain::with_network(Network::Bitcoin).with_height(1);
332+
333+
let new_tip = test_chain.tip();
334+
let old_tip = main_chain.tip();
335+
let mut listener = MockChainListener::new();
336+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=1) };
337+
let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet);
338+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
339+
Err((e, _)) => {
340+
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
341+
assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached");
342+
},
343+
Ok(_) => panic!("Expected error"),
344+
}
345+
}
346+
347+
#[tokio::test]
348+
async fn sync_from_equal_length_fork() {
349+
let main_chain = Blockchain::default().with_height(2);
350+
let mut fork_chain = main_chain.fork_at_height(1);
351+
352+
let new_tip = fork_chain.tip();
353+
let old_tip = main_chain.tip();
354+
let mut listener = MockChainListener::new()
355+
.expect_block_disconnected(*old_tip)
356+
.expect_block_connected(*new_tip);
357+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) };
358+
let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
359+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
360+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
361+
Ok(_) => {},
362+
}
363+
}
364+
365+
#[tokio::test]
366+
async fn sync_from_shorter_fork() {
367+
let main_chain = Blockchain::default().with_height(3);
368+
let mut fork_chain = main_chain.fork_at_height(1);
369+
fork_chain.disconnect_tip();
370+
371+
let new_tip = fork_chain.tip();
372+
let old_tip = main_chain.tip();
373+
let mut listener = MockChainListener::new()
374+
.expect_block_disconnected(*old_tip)
375+
.expect_block_disconnected(*main_chain.at_height(2))
376+
.expect_block_connected(*new_tip);
377+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=3) };
378+
let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
379+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
380+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
381+
Ok(_) => {},
382+
}
383+
}
384+
385+
#[tokio::test]
386+
async fn sync_from_longer_fork() {
387+
let mut main_chain = Blockchain::default().with_height(3);
388+
let mut fork_chain = main_chain.fork_at_height(1);
389+
main_chain.disconnect_tip();
390+
391+
let new_tip = fork_chain.tip();
392+
let old_tip = main_chain.tip();
393+
let mut listener = MockChainListener::new()
394+
.expect_block_disconnected(*old_tip)
395+
.expect_block_connected(*fork_chain.at_height(2))
396+
.expect_block_connected(*new_tip);
397+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) };
398+
let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
399+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
400+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
401+
Ok(_) => {},
402+
}
403+
}
404+
405+
#[tokio::test]
406+
async fn sync_from_chain_without_headers() {
407+
let mut chain = Blockchain::default().with_height(3).without_headers();
408+
409+
let new_tip = chain.tip();
410+
let old_tip = chain.at_height(1);
411+
let mut listener = MockChainListener::new();
412+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) };
413+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
414+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
415+
Err((_, tip)) => assert_eq!(tip, None),
416+
Ok(_) => panic!("Expected error"),
417+
}
418+
}
419+
420+
#[tokio::test]
421+
async fn sync_from_chain_without_any_new_blocks() {
422+
let mut chain = Blockchain::default().with_height(3).without_blocks(2..);
423+
424+
let new_tip = chain.tip();
425+
let old_tip = chain.at_height(1);
426+
let mut listener = MockChainListener::new();
427+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) };
428+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
429+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
430+
Err((_, tip)) => assert_eq!(tip, Some(old_tip)),
431+
Ok(_) => panic!("Expected error"),
432+
}
433+
}
434+
435+
#[tokio::test]
436+
async fn sync_from_chain_without_some_new_blocks() {
437+
let mut chain = Blockchain::default().with_height(3).without_blocks(3..);
438+
439+
let new_tip = chain.tip();
440+
let old_tip = chain.at_height(1);
441+
let mut listener = MockChainListener::new()
442+
.expect_block_connected(*chain.at_height(2));
443+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) };
444+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
445+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
446+
Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))),
447+
Ok(_) => panic!("Expected error"),
448+
}
449+
}
450+
}

lightning-block-sync/src/poll.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl Validate for Block {
8989
/// A block header with validated proof of work and corresponding block hash.
9090
#[derive(Clone, Copy, Debug, PartialEq)]
9191
pub struct ValidatedBlockHeader {
92-
block_hash: BlockHash,
92+
pub(crate) block_hash: BlockHash,
9393
inner: BlockHeaderData,
9494
}
9595

@@ -135,7 +135,7 @@ impl ValidatedBlockHeader {
135135

136136
/// A block with validated data against its transaction list and corresponding block hash.
137137
pub struct ValidatedBlock {
138-
block_hash: BlockHash,
138+
pub(crate) block_hash: BlockHash,
139139
inner: Block,
140140
}
141141

0 commit comments

Comments
 (0)