Skip to content

Commit eefb2dd

Browse files
committed
Add ChainNotifier and define ChainListener trait
Add an interface for being notified of block connected and disconnected events, along with a notifier for generating such events. Used while polling block sources for a new tip in order to feed these events into ChannelManager and ChainMonitor.
1 parent 7f2ec42 commit eefb2dd

File tree

3 files changed

+438
-4
lines changed

3 files changed

+438
-4
lines changed

lightning-block-sync/src/lib.rs

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ mod test_utils;
3131
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
3232
mod utils;
3333

34+
use crate::poll::{Poll, ValidatedBlockHeader};
35+
3436
use bitcoin::blockdata::block::{Block, BlockHeader};
3537
use bitcoin::hash_types::BlockHash;
3638
use bitcoin::util::uint::Uint256;
@@ -130,3 +132,336 @@ pub struct BlockHeaderData {
130132
/// of equivalent weight.
131133
pub chainwork: Uint256,
132134
}
135+
136+
/// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
137+
///
138+
/// Used when needing to replay chain data upon startup or as new chain events occur.
139+
pub trait ChainListener {
140+
/// Notifies the listener that a block was added at the given height.
141+
fn block_connected(&mut self, block: &Block, height: u32);
142+
143+
/// Notifies the listener that a block was removed at the given height.
144+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32);
145+
}
146+
147+
/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
148+
/// keyed by block hash.
149+
///
150+
/// Used by [`ChainNotifier`] to store headers along the best chain. Implementations may define
151+
/// their own cache eviction policy.
152+
///
153+
/// [`ChainNotifier`]: struct.ChainNotifier.html
154+
pub trait Cache {
155+
/// Retrieves the block header keyed by the given block hash.
156+
fn get(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;
157+
158+
/// Inserts a block header keyed by the given block hash.
159+
fn insert(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);
160+
161+
/// Removes the block header keyed by the given block hash.
162+
fn remove(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>;
163+
}
164+
165+
/// Unbounded cache of block headers keyed by block hash.
166+
pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
167+
168+
impl Cache for UnboundedCache {
169+
fn get(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
170+
self.get(block_hash)
171+
}
172+
173+
fn insert(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
174+
self.insert(block_hash, block_header);
175+
}
176+
177+
fn remove(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
178+
self.remove(block_hash)
179+
}
180+
}
181+
182+
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
183+
///
184+
/// [listeners]: trait.ChainListener.html
185+
struct ChainNotifier<C: Cache> {
186+
/// Cache for looking up headers before fetching from a block source.
187+
header_cache: C,
188+
}
189+
190+
/// Steps outlining changes needed to be made to the chain in order to transform it from having one
191+
/// chain tip to another.
192+
enum ForkStep {
193+
ForkPoint(ValidatedBlockHeader),
194+
DisconnectBlock(ValidatedBlockHeader),
195+
ConnectBlock(ValidatedBlockHeader),
196+
}
197+
198+
impl<C: Cache> ChainNotifier<C> {
199+
/// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from
200+
/// `old_header` to get to that point and then connecting blocks until `new_header`.
201+
///
202+
/// Validates headers along the transition path, but doesn't fetch blocks until the chain is
203+
/// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
204+
/// ended up which may not be `new_header`. Note that iff the returned `Err` contains `Some`
205+
/// header then the transition from `old_header` to `new_header` is valid.
206+
async fn sync_listener<L: ChainListener, P: Poll>(
207+
&mut self,
208+
new_header: ValidatedBlockHeader,
209+
old_header: &ValidatedBlockHeader,
210+
chain_poller: &mut P,
211+
chain_listener: &mut L,
212+
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
213+
let mut events = self.find_fork(new_header, old_header, chain_poller).await.map_err(|e| (e, None))?;
214+
215+
let mut last_disconnect_tip = None;
216+
let mut new_tip = None;
217+
for event in events.iter() {
218+
match &event {
219+
&ForkStep::DisconnectBlock(ref header) => {
220+
println!("Disconnecting block {}", header.block_hash);
221+
if let Some(cached_header) = self.header_cache.remove(&header.block_hash) {
222+
assert_eq!(cached_header, *header);
223+
}
224+
chain_listener.block_disconnected(&header.header, header.height);
225+
last_disconnect_tip = Some(header.header.prev_blockhash);
226+
},
227+
&ForkStep::ForkPoint(ref header) => {
228+
new_tip = Some(*header);
229+
},
230+
_ => {},
231+
}
232+
}
233+
234+
// If blocks were disconnected, new blocks will connect starting from the fork point.
235+
// Otherwise, there was no fork, so new blocks connect starting from the old tip.
236+
assert_eq!(last_disconnect_tip.is_some(), new_tip.is_some());
237+
if let &Some(ref tip_header) = &new_tip {
238+
debug_assert_eq!(tip_header.header.block_hash(), *last_disconnect_tip.as_ref().unwrap());
239+
} else {
240+
new_tip = Some(*old_header);
241+
}
242+
243+
for event in events.drain(..).rev() {
244+
if let ForkStep::ConnectBlock(header) = event {
245+
let block = chain_poller
246+
.fetch_block(&header).await
247+
.or_else(|e| Err((e, new_tip)))?;
248+
debug_assert_eq!(block.block_hash, header.block_hash);
249+
250+
println!("Connecting block {}", header.block_hash);
251+
self.header_cache.insert(header.block_hash, header);
252+
chain_listener.block_connected(&block, header.height);
253+
new_tip = Some(header);
254+
}
255+
}
256+
Ok(())
257+
}
258+
259+
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
260+
/// Returns the steps needed to produce the chain with `current_header` as its tip from the
261+
/// chain with `prev_header` as its tip. There is no ordering guarantee between different
262+
/// `ForkStep` types, but `DisconnectBlock` and `ConnectBlock` are each returned in
263+
/// height-descending order.
264+
async fn find_fork<P: Poll>(
265+
&self,
266+
current_header: ValidatedBlockHeader,
267+
prev_header: &ValidatedBlockHeader,
268+
chain_poller: &mut P,
269+
) -> BlockSourceResult<Vec<ForkStep>> {
270+
let mut steps = Vec::new();
271+
let mut current = current_header;
272+
let mut previous = *prev_header;
273+
loop {
274+
// Found the parent block.
275+
if current.height == previous.height + 1 &&
276+
current.header.prev_blockhash == previous.block_hash {
277+
steps.push(ForkStep::ConnectBlock(current));
278+
break;
279+
}
280+
281+
// Found a chain fork.
282+
if current.header.prev_blockhash == previous.header.prev_blockhash {
283+
let fork_point = self.look_up_previous_header(chain_poller, &previous).await?;
284+
steps.push(ForkStep::DisconnectBlock(previous));
285+
steps.push(ForkStep::ConnectBlock(current));
286+
steps.push(ForkStep::ForkPoint(fork_point));
287+
break;
288+
}
289+
290+
// Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
291+
// the header with the greater height, or both if equal heights.
292+
let current_height = current.height;
293+
let previous_height = previous.height;
294+
if current_height <= previous_height {
295+
steps.push(ForkStep::DisconnectBlock(previous));
296+
previous = self.look_up_previous_header(chain_poller, &previous).await?;
297+
}
298+
if current_height >= previous_height {
299+
steps.push(ForkStep::ConnectBlock(current));
300+
current = self.look_up_previous_header(chain_poller, &current).await?;
301+
}
302+
}
303+
304+
Ok(steps)
305+
}
306+
307+
/// Returns the previous header for the given header, either by looking it up in the cache or
308+
/// fetching it if not found.
309+
async fn look_up_previous_header<P: Poll>(
310+
&self,
311+
chain_poller: &mut P,
312+
header: &ValidatedBlockHeader,
313+
) -> BlockSourceResult<ValidatedBlockHeader> {
314+
match self.header_cache.get(&header.header.prev_blockhash) {
315+
Some(prev_header) => Ok(*prev_header),
316+
None => chain_poller.look_up_previous_header(header).await,
317+
}
318+
}
319+
}
320+
321+
#[cfg(test)]
322+
mod chain_notifier_tests {
323+
use crate::test_utils::{Blockchain, MockChainListener};
324+
use super::*;
325+
326+
use bitcoin::network::constants::Network;
327+
328+
#[tokio::test]
329+
async fn sync_from_same_chain() {
330+
let mut chain = Blockchain::default().with_height(3);
331+
332+
let new_tip = chain.tip();
333+
let old_tip = chain.at_height(1);
334+
let mut listener = MockChainListener::new()
335+
.expect_block_connected(*chain.at_height(2))
336+
.expect_block_connected(*new_tip);
337+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) };
338+
let mut poller = poll::ChainPoller::new(&mut chain as &mut dyn BlockSource, Network::Testnet);
339+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
340+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
341+
Ok(_) => {},
342+
}
343+
}
344+
345+
#[tokio::test]
346+
async fn sync_from_different_chains() {
347+
let mut test_chain = Blockchain::with_network(Network::Testnet).with_height(1);
348+
let main_chain = Blockchain::with_network(Network::Bitcoin).with_height(1);
349+
350+
let new_tip = test_chain.tip();
351+
let old_tip = main_chain.tip();
352+
let mut listener = MockChainListener::new();
353+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=1) };
354+
let mut poller = poll::ChainPoller::new(&mut test_chain as &mut dyn BlockSource, Network::Testnet);
355+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
356+
Err((e, _)) => {
357+
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
358+
assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached");
359+
},
360+
Ok(_) => panic!("Expected error"),
361+
}
362+
}
363+
364+
#[tokio::test]
365+
async fn sync_from_equal_length_fork() {
366+
let main_chain = Blockchain::default().with_height(2);
367+
let mut fork_chain = main_chain.fork_at_height(1);
368+
369+
let new_tip = fork_chain.tip();
370+
let old_tip = main_chain.tip();
371+
let mut listener = MockChainListener::new()
372+
.expect_block_disconnected(*old_tip)
373+
.expect_block_connected(*new_tip);
374+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) };
375+
let mut poller = poll::ChainPoller::new(&mut fork_chain as &mut dyn BlockSource, Network::Testnet);
376+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
377+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
378+
Ok(_) => {},
379+
}
380+
}
381+
382+
#[tokio::test]
383+
async fn sync_from_shorter_fork() {
384+
let main_chain = Blockchain::default().with_height(3);
385+
let mut fork_chain = main_chain.fork_at_height(1);
386+
fork_chain.disconnect_tip();
387+
388+
let new_tip = fork_chain.tip();
389+
let old_tip = main_chain.tip();
390+
let mut listener = MockChainListener::new()
391+
.expect_block_disconnected(*old_tip)
392+
.expect_block_disconnected(*main_chain.at_height(2))
393+
.expect_block_connected(*new_tip);
394+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=3) };
395+
let mut poller = poll::ChainPoller::new(&mut fork_chain as &mut dyn BlockSource, Network::Testnet);
396+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
397+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
398+
Ok(_) => {},
399+
}
400+
}
401+
402+
#[tokio::test]
403+
async fn sync_from_longer_fork() {
404+
let mut main_chain = Blockchain::default().with_height(3);
405+
let mut fork_chain = main_chain.fork_at_height(1);
406+
main_chain.disconnect_tip();
407+
408+
let new_tip = fork_chain.tip();
409+
let old_tip = main_chain.tip();
410+
let mut listener = MockChainListener::new()
411+
.expect_block_disconnected(*old_tip)
412+
.expect_block_connected(*fork_chain.at_height(2))
413+
.expect_block_connected(*new_tip);
414+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) };
415+
let mut poller = poll::ChainPoller::new(&mut fork_chain as &mut dyn BlockSource, Network::Testnet);
416+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
417+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
418+
Ok(_) => {},
419+
}
420+
}
421+
422+
#[tokio::test]
423+
async fn sync_from_chain_without_headers() {
424+
let mut chain = Blockchain::default().with_height(3).without_headers();
425+
426+
let new_tip = chain.tip();
427+
let old_tip = chain.at_height(1);
428+
let mut listener = MockChainListener::new();
429+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) };
430+
let mut poller = poll::ChainPoller::new(&mut chain as &mut dyn BlockSource, Network::Testnet);
431+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
432+
Err((_, tip)) => assert_eq!(tip, None),
433+
Ok(_) => panic!("Expected error"),
434+
}
435+
}
436+
437+
#[tokio::test]
438+
async fn sync_from_chain_without_any_new_blocks() {
439+
let mut chain = Blockchain::default().with_height(3).without_blocks(2..);
440+
441+
let new_tip = chain.tip();
442+
let old_tip = chain.at_height(1);
443+
let mut listener = MockChainListener::new();
444+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) };
445+
let mut poller = poll::ChainPoller::new(&mut chain as &mut dyn BlockSource, Network::Testnet);
446+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
447+
Err((_, tip)) => assert_eq!(tip, Some(old_tip)),
448+
Ok(_) => panic!("Expected error"),
449+
}
450+
}
451+
452+
#[tokio::test]
453+
async fn sync_from_chain_without_some_new_blocks() {
454+
let mut chain = Blockchain::default().with_height(3).without_blocks(3..);
455+
456+
let new_tip = chain.tip();
457+
let old_tip = chain.at_height(1);
458+
let mut listener = MockChainListener::new()
459+
.expect_block_connected(*chain.at_height(2));
460+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) };
461+
let mut poller = poll::ChainPoller::new(&mut chain as &mut dyn BlockSource, Network::Testnet);
462+
match notifier.sync_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
463+
Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))),
464+
Ok(_) => panic!("Expected error"),
465+
}
466+
}
467+
}

lightning-block-sync/src/poll.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl Validate for Block {
8585
/// A block header with validated proof of work and corresponding block hash.
8686
#[derive(Clone, Copy, Debug, PartialEq)]
8787
pub struct ValidatedBlockHeader {
88-
block_hash: BlockHash,
88+
pub(crate) block_hash: BlockHash,
8989
inner: BlockHeaderData,
9090
}
9191

@@ -131,7 +131,7 @@ impl ValidatedBlockHeader {
131131

132132
/// A block with validated data against its transaction list and corresponding block hash.
133133
pub struct ValidatedBlock {
134-
block_hash: BlockHash,
134+
pub(crate) block_hash: BlockHash,
135135
inner: Block,
136136
}
137137

0 commit comments

Comments
 (0)