Skip to content

Commit 525365d

Browse files
committed
[ETCM-247] Fix Fetcher updates
- Fetcher will update last block and know top(only if is needed) if a mined or checkpointed block was imported - Fetcher will update know top if detect a block header with better top fix
1 parent cd5ae33 commit 525365d

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class BlockFetcher(
5050
private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
5151
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
5252
peerEventBus ! Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers))
53+
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.AllPeers))
5354
}
5455

5556
def handleCommonMessages(state: Option[BlockFetcherState]): Receive = { case PrintStatus =>
@@ -63,7 +64,8 @@ class BlockFetcher(
6364
handleNewBlockMessages(state) orElse
6465
handleHeadersMessages(state) orElse
6566
handleBodiesMessages(state) orElse
66-
handleStateNodeMessages(state)
67+
handleStateNodeMessages(state) orElse
68+
handlePossibleTopUpdate(state)
6769

6870
private def handleCommands(state: BlockFetcherState): Receive = {
6971
case PickBlocks(amount) => state.pickBlocks(amount) |> handlePickedBlocks(state) |> fetchBlocks
@@ -183,6 +185,21 @@ class BlockFetcher(
183185
fetchBlocks(newState)
184186
}
185187

188+
private def handlePossibleTopUpdate(state: BlockFetcherState): Receive = {
189+
case MessageFromPeer(BlockHeaders(headers), _) =>
190+
headers.headOption.map { bh =>
191+
log.debug(s"Possible new top at block ${bh.number}")
192+
val newState = state.withPossibleNewTopAt(bh.number)
193+
fetchBlocks(newState)
194+
}
195+
//keep fetcher state updated in case new checkpoint block or mined block was imported
196+
case LastBlockChanged(blockNr) => {
197+
log.debug(s"New last block $blockNr imported from the inside")
198+
val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr)
199+
fetchBlocks(newState)
200+
}
201+
}
202+
186203
private def handlePickedBlocks(
187204
state: BlockFetcherState
188205
)(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState =
@@ -306,6 +323,7 @@ object BlockFetcher {
306323
new InvalidateBlocksFrom(from, reason, toBlacklist)
307324
}
308325
case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchMsg
326+
case class LastBlockChanged(blockNr: BigInt) extends FetchMsg
309327
case object RetryBodiesRequest extends FetchMsg
310328
case object RetryHeadersRequest extends FetchMsg
311329

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,15 +201,35 @@ class BlockImporter(
201201
}
202202

203203
private def importMinedBlock(block: Block, state: ImporterState): Unit =
204-
importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state)
204+
importBlock(
205+
block,
206+
new MinedBlockImportMessages(block),
207+
informFetcherOnFail = false,
208+
informFetcherOnLastBlockChanged = true
209+
)(state)
205210

206211
private def importCheckpointBlock(block: Block, state: ImporterState): Unit =
207-
importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state)
212+
importBlock(
213+
block,
214+
new CheckpointBlockImportMessages(block),
215+
informFetcherOnFail = false,
216+
informFetcherOnLastBlockChanged = true
217+
)(state)
208218

209219
private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit =
210-
importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state)
211-
212-
private def importBlock(block: Block, importMessages: ImportMessages, informFetcherOnFail: Boolean): ImportFn = {
220+
importBlock(
221+
block,
222+
new NewBlockImportMessages(block, peerId),
223+
informFetcherOnFail = true,
224+
informFetcherOnLastBlockChanged = false
225+
)(state)
226+
227+
private def importBlock(
228+
block: Block,
229+
importMessages: ImportMessages,
230+
informFetcherOnFail: Boolean,
231+
informFetcherOnLastBlockChanged: Boolean
232+
): ImportFn = {
213233
def doLog(entry: ImportMessages.LogEntry): Unit = log.log(entry._1, entry._2)
214234

215235
importWith {
@@ -222,6 +242,9 @@ class BlockImporter(
222242
val (blocks, tds) = importedBlocksData.map(data => (data.block, data.td)).unzip
223243
broadcastBlocks(blocks, tds)
224244
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
245+
if (informFetcherOnLastBlockChanged) {
246+
fetcher ! BlockFetcher.LastBlockChanged(blocks.last.number)
247+
}
225248

226249
case BlockEnqueued => ()
227250

@@ -232,6 +255,9 @@ class BlockImporter(
232255
case ChainReorganised(oldBranch, newBranch, totalDifficulties) =>
233256
updateTxPool(newBranch, oldBranch)
234257
broadcastBlocks(newBranch, totalDifficulties)
258+
if (informFetcherOnLastBlockChanged) {
259+
fetcher ! BlockFetcher.LastBlockChanged(newBranch.last.number)
260+
}
235261

236262
case BlockImportFailed(error) =>
237263
if (informFetcherOnFail) {

0 commit comments

Comments
 (0)