Skip to content

Commit 4a0a9b1

Browse files
committed
Implement ChainMultiplexer in terms of ChainPoller
Removes the BlockSource implementation for ChainMultiplexer as it is no longer needed.
1 parent 6d54ffb commit 4a0a9b1

File tree

1 file changed

+15
-50
lines changed

1 file changed

+15
-50
lines changed

lightning-block-sync/src/poller.rs

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainTip, Poll, Validate, ValidatedBlock, ValidatedBlockHeader};
1+
use crate::{AsyncBlockSourceResult, BlockSource, BlockSourceError, ChainTip, Poll, Validate, ValidatedBlock, ValidatedBlockHeader};
22

3-
use bitcoin::blockdata::block::Block;
4-
use bitcoin::hash_types::BlockHash;
53
use bitcoin::network::constants::Network;
64

75
use std::ops::DerefMut;
@@ -73,27 +71,26 @@ impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll fo
7371
}
7472

7573
pub struct ChainMultiplexer<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> {
76-
block_sources: Vec<(B, BlockSourceError)>,
77-
backup_block_sources: Vec<(B, BlockSourceError)>,
74+
block_sources: Vec<(ChainPoller<'b, B>, BlockSourceError)>,
75+
backup_block_sources: Vec<(ChainPoller<'b, B>, BlockSourceError)>,
7876
best_block_source: usize,
79-
network: Network,
8077
}
8178

8279
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> ChainMultiplexer<'b, B> {
8380
pub fn new(mut block_sources: Vec<B>, mut backup_block_sources: Vec<B>, network: Network) -> Self {
8481
assert!(!block_sources.is_empty());
8582
let block_sources = block_sources.drain(..).map(|block_source| {
86-
(block_source, BlockSourceError::Transient)
83+
(ChainPoller::new(block_source, network), BlockSourceError::Transient)
8784
}).collect();
8885

8986
let backup_block_sources = backup_block_sources.drain(..).map(|block_source| {
90-
(block_source, BlockSourceError::Transient)
87+
(ChainPoller::new(block_source, network), BlockSourceError::Transient)
9188
}).collect();
9289

93-
Self { block_sources, backup_block_sources, best_block_source: 0, network }
90+
Self { block_sources, backup_block_sources, best_block_source: 0 }
9491
}
9592

96-
fn best_and_backup_block_sources(&mut self) -> Vec<&mut (B, BlockSourceError)> {
93+
fn best_and_backup_block_sources(&mut self) -> Vec<&mut (ChainPoller<'b, B>, BlockSourceError)> {
9794
let best_block_source = self.block_sources.get_mut(self.best_block_source).unwrap();
9895
let backup_block_sources = self.backup_block_sources.iter_mut();
9996
std::iter::once(best_block_source)
@@ -110,12 +107,11 @@ impl<'b, B: 'b + DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Po
110107
Box::pin(async move {
111108
let mut heaviest_chain_tip = best_chain_tip;
112109
let mut best_result = Err(BlockSourceError::Persistent);
113-
for (i, (block_source, error)) in self.block_sources.iter_mut().enumerate() {
110+
for (i, (poller, error)) in self.block_sources.iter_mut().enumerate() {
114111
if let BlockSourceError::Persistent = error {
115112
continue;
116113
}
117114

118-
let mut poller = ChainPoller::new(&mut **block_source as &mut dyn BlockSource, self.network);
119115
let result = poller.poll_chain_tip(heaviest_chain_tip).await;
120116
match result {
121117
Err(BlockSourceError::Persistent) => {
@@ -152,41 +148,8 @@ impl<'b, B: 'b + DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Po
152148
AsyncBlockSourceResult<'a, ValidatedBlockHeader>
153149
{
154150
Box::pin(async move {
155-
let previous_hash = &header.header.prev_blockhash;
156-
let height = header.height - 1;
157-
self.get_header(previous_hash, Some(height)).await?
158-
.validate(*previous_hash)
159-
})
160-
}
161-
162-
fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
163-
AsyncBlockSourceResult<'a, ValidatedBlock>
164-
{
165-
Box::pin(async move {
166-
self.get_block(&header.block_hash).await?
167-
.validate(header.block_hash)
168-
})
169-
}
170-
}
171-
172-
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> BlockSource for ChainMultiplexer<'b, B> {
173-
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
174-
Box::pin(async move {
175-
for (block_source, error) in self.best_and_backup_block_sources() {
176-
let result = block_source.get_header(header_hash, height).await;
177-
match result {
178-
Err(e) => *error = e,
179-
Ok(_) => return result,
180-
}
181-
}
182-
Err(BlockSourceError::Persistent)
183-
})
184-
}
185-
186-
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
187-
Box::pin(async move {
188-
for (block_source, error) in self.best_and_backup_block_sources() {
189-
let result = block_source.get_block(header_hash).await;
151+
for (poller, error) in self.best_and_backup_block_sources() {
152+
let result = poller.look_up_previous_header(header).await;
190153
match result {
191154
Err(e) => *error = e,
192155
Ok(_) => return result,
@@ -196,10 +159,12 @@ impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> BlockSo
196159
})
197160
}
198161

199-
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
162+
fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
163+
AsyncBlockSourceResult<'a, ValidatedBlock>
164+
{
200165
Box::pin(async move {
201-
for (block_source, error) in self.best_and_backup_block_sources() {
202-
let result = block_source.get_best_block().await;
166+
for (poller, error) in self.best_and_backup_block_sources() {
167+
let result = poller.fetch_block(header).await;
203168
match result {
204169
Err(e) => *error = e,
205170
Ok(_) => return result,

0 commit comments

Comments
 (0)