Skip to content

Commit 5fda427

Browse files
committed
[ETCM-247] Fix Fetcher updates
- Fetcher will update last block and know top(only if is needed) if a mined or checkpointed block was imported - Fetcher will update know top if detect a block header with better top
1 parent cd5ae33 commit 5fda427

File tree

2 files changed

+52
-6
lines changed

2 files changed

+52
-6
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class BlockFetcher(
5050
private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
5151
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
5252
peerEventBus ! Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers))
53+
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.AllPeers))
5354
}
5455

5556
def handleCommonMessages(state: Option[BlockFetcherState]): Receive = { case PrintStatus =>
@@ -63,7 +64,8 @@ class BlockFetcher(
6364
handleNewBlockMessages(state) orElse
6465
handleHeadersMessages(state) orElse
6566
handleBodiesMessages(state) orElse
66-
handleStateNodeMessages(state)
67+
handleStateNodeMessages(state) orElse
68+
handlePossibleTopUpdate(state)
6769

6870
private def handleCommands(state: BlockFetcherState): Receive = {
6971
case PickBlocks(amount) => state.pickBlocks(amount) |> handlePickedBlocks(state) |> fetchBlocks
@@ -183,6 +185,23 @@ class BlockFetcher(
183185
fetchBlocks(newState)
184186
}
185187

188+
private def handlePossibleTopUpdate(state: BlockFetcherState): Receive = {
189+
//by handling these type of messages, fetcher can received from network, fresh info about blocks on top
190+
//ex. After a successful handshake, fetcher will receive the info about the header of the peer best block
191+
case MessageFromPeer(BlockHeaders(headers), _) =>
192+
headers.lastOption.map { bh =>
193+
log.debug(s"Candidate for new top at block ${bh.number}, current know top ${state.knownTop}")
194+
val newState = state.withPossibleNewTopAt(bh.number)
195+
fetchBlocks(newState)
196+
}
197+
//keep fetcher state updated in case new checkpoint block or mined block was imported
198+
case LastBlockChanged(blockNr) => {
199+
log.debug(s"New last block $blockNr imported from the inside")
200+
val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr)
201+
fetchBlocks(newState)
202+
}
203+
}
204+
186205
private def handlePickedBlocks(
187206
state: BlockFetcherState
188207
)(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState =
@@ -306,6 +325,7 @@ object BlockFetcher {
306325
new InvalidateBlocksFrom(from, reason, toBlacklist)
307326
}
308327
case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchMsg
328+
case class LastBlockChanged(blockNr: BigInt) extends FetchMsg
309329
case object RetryBodiesRequest extends FetchMsg
310330
case object RetryHeadersRequest extends FetchMsg
311331

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,15 +201,35 @@ class BlockImporter(
201201
}
202202

203203
private def importMinedBlock(block: Block, state: ImporterState): Unit =
204-
importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state)
204+
importBlock(
205+
block,
206+
new MinedBlockImportMessages(block),
207+
informFetcherOnFail = false,
208+
informFetcherOnLastBlockChanged = true
209+
)(state)
205210

206211
private def importCheckpointBlock(block: Block, state: ImporterState): Unit =
207-
importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state)
212+
importBlock(
213+
block,
214+
new CheckpointBlockImportMessages(block),
215+
informFetcherOnFail = false,
216+
informFetcherOnLastBlockChanged = true
217+
)(state)
208218

209219
private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit =
210-
importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state)
211-
212-
private def importBlock(block: Block, importMessages: ImportMessages, informFetcherOnFail: Boolean): ImportFn = {
220+
importBlock(
221+
block,
222+
new NewBlockImportMessages(block, peerId),
223+
informFetcherOnFail = true,
224+
informFetcherOnLastBlockChanged = false
225+
)(state)
226+
227+
private def importBlock(
228+
block: Block,
229+
importMessages: ImportMessages,
230+
informFetcherOnFail: Boolean,
231+
informFetcherOnLastBlockChanged: Boolean
232+
): ImportFn = {
213233
def doLog(entry: ImportMessages.LogEntry): Unit = log.log(entry._1, entry._2)
214234

215235
importWith {
@@ -222,6 +242,9 @@ class BlockImporter(
222242
val (blocks, tds) = importedBlocksData.map(data => (data.block, data.td)).unzip
223243
broadcastBlocks(blocks, tds)
224244
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
245+
if (informFetcherOnLastBlockChanged) {
246+
fetcher ! BlockFetcher.LastBlockChanged(blocks.last.number)
247+
}
225248

226249
case BlockEnqueued => ()
227250

@@ -232,6 +255,9 @@ class BlockImporter(
232255
case ChainReorganised(oldBranch, newBranch, totalDifficulties) =>
233256
updateTxPool(newBranch, oldBranch)
234257
broadcastBlocks(newBranch, totalDifficulties)
258+
if (informFetcherOnLastBlockChanged) {
259+
fetcher ! BlockFetcher.LastBlockChanged(newBranch.last.number)
260+
}
235261

236262
case BlockImportFailed(error) =>
237263
if (informFetcherOnFail) {

0 commit comments

Comments
 (0)