Skip to content

Commit 70cf7cf

Browse files
authored
ETCM-941: Simplify Block Importer (#1020)
Scalafmt Fix comments
1 parent 17e68c3 commit 70cf7cf

File tree

4 files changed

+31
-77
lines changed

4 files changed

+31
-77
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
1515
HeadersNotFormingSeq,
1616
HeadersNotMatchingReadyBlocks
1717
}
18-
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
18+
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.ImportNewBlock
1919
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
2020
import io.iohk.ethereum.domain._
2121
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
@@ -255,7 +255,6 @@ class BlockFetcher(
255255
.withPeerForBlocks(peerId, Seq(newBlockNr))
256256
.withLastBlock(newBlockNr)
257257
.withKnownTopAt(newBlockNr)
258-
state.importer ! OnTop
259258
state.importer ! ImportNewBlock(block, peerId)
260259
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
261260
processFetchCommands(newState)
@@ -276,9 +275,8 @@ class BlockFetcher(
276275
replyTo: ClassicActorRef
277276
)(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState =
278277
pickResult
279-
.tap { case (blocks, newState) =>
278+
.tap { case (blocks, _) =>
280279
replyTo ! PickedBlocks(blocks)
281-
replyTo ! (if (newState.isOnTop) OnTop else NotOnTop)
282280
}
283281
.fold(state)(_._2)
284282

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 29 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import monix.execution.Scheduler
2424

2525
import scala.concurrent.duration._
2626

27-
// scalastyle:off cyclomatic.complexity
2827
class BlockImporter(
2928
fetcher: ActorRef,
3029
ledger: Ledger,
@@ -53,16 +52,9 @@ class BlockImporter(
5352
start()
5453
}
5554

56-
private def handleTopMessages(state: ImporterState, currentBehavior: Behavior): Receive = {
57-
case OnTop => context become currentBehavior(state.onTop())
58-
case NotOnTop => context become currentBehavior(state.notOnTop())
59-
}
60-
61-
private def running(state: ImporterState): Receive = handleTopMessages(state, running) orElse {
55+
private def running(state: ImporterState): Receive = {
6256
case ReceiveTimeout => self ! PickBlocks
6357

64-
case PrintStatus => log.info("Block: {}, is on top?: {}", blockchain.getBestBlockNumber(), state.isOnTop)
65-
6658
case BlockFetcher.PickedBlocks(blocks) =>
6759
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
6860
importBlocks(blocks, DefaultBlockImport)(state)
@@ -89,7 +81,7 @@ class BlockImporter(
8981
internally = true
9082
)(state)
9183

92-
case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing =>
84+
case ImportNewBlock(block, peerId) if !state.importing =>
9385
importBlock(
9486
block,
9587
new NewBlockImportMessages(block, peerId),
@@ -121,36 +113,34 @@ class BlockImporter(
121113
running(state.resolvingBranch(from))
122114

123115
private def start(): Unit = {
124-
log.debug("Starting Regular Sync, current best block is {}", startingBlockNumber)
125-
fetcher ! BlockFetcher.Start(self, startingBlockNumber)
126-
supervisor ! ProgressProtocol.StartingFrom(startingBlockNumber)
116+
log.debug("Starting Regular Sync, current best block is {}", bestKnownBlockNumber)
117+
fetcher ! BlockFetcher.Start(self, bestKnownBlockNumber)
118+
supervisor ! ProgressProtocol.StartingFrom(bestKnownBlockNumber)
127119
context become running(ImporterState.initial)
128120
}
129121

130122
private def pickBlocks(state: ImporterState): Unit = {
131-
val msg =
132-
state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand](
133-
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self)
134-
)(from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber, self))
123+
val msg = state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand](
124+
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self)
125+
)(from => BlockFetcher.StrictPickBlocks(from, bestKnownBlockNumber, self))
135126

136127
fetcher ! msg
137128
}
138129

139130
private def importBlocks(blocks: NonEmptyList[Block], blockImportType: BlockImportType): ImportFn = importWith(
140-
{
141-
Task(
131+
Task
132+
.now {
142133
log.debug(
143134
"Attempting to import blocks starting from {} and ending with {}",
144135
blocks.head.number,
145136
blocks.last.number
146137
)
147-
)
148-
.flatMap(_ => Task.now(resolveBranch(blocks)))
149-
.flatMap {
150-
case Right(blocksToImport) => handleBlocksImport(blocksToImport)
151-
case Left(resolvingFrom) => Task.now(ResolvingBranch(resolvingFrom))
152-
}
153-
},
138+
resolveBranch(blocks)
139+
}
140+
.flatMap {
141+
case Right(blocksToImport) => handleBlocksImport(blocksToImport)
142+
case Left(resolvingFrom) => Task.now(ResolvingBranch(resolvingFrom))
143+
},
154144
blockImportType
155145
)
156146

@@ -187,12 +177,9 @@ class BlockImporter(
187177
importedBlocks: List[Block] = Nil
188178
): Task[(List[Block], Option[Any])] =
189179
if (blocks.isEmpty) {
190-
importedBlocks.headOption match {
191-
case Some(block) =>
192-
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
193-
case None => ()
194-
}
195-
180+
importedBlocks.headOption.foreach(block =>
181+
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
182+
)
196183
Task.now((importedBlocks, None))
197184
} else {
198185
val restOfBlocks = blocks.tail
@@ -244,27 +231,22 @@ class BlockImporter(
244231
broadcastBlocks(blocks, weights)
245232
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
246233
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)
247-
case BlockEnqueued => ()
248-
case DuplicateBlock => ()
249-
case UnknownParent => () // This is normal when receiving broadcast blocks
250234
case ChainReorganised(oldBranch, newBranch, weights) =>
251235
updateTxPool(newBranch, oldBranch)
252236
broadcastBlocks(newBranch, weights)
253-
newBranch.lastOption match {
254-
case Some(newBlock) =>
255-
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally)
256-
case None => ()
257-
}
237+
newBranch.lastOption.foreach(block =>
238+
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)
239+
)
258240
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
259241
// state node re-download will be handled when downloading headers
260242
doLog(importMessages.missingStateNode(missingNodeException))
261243
Running
262244
case BlockImportFailedDueToMissingNode(missingNodeException) =>
263245
Task.raiseError(missingNodeException)
264-
case BlockImportFailed(error) =>
265-
if (informFetcherOnFail) {
266-
fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error))
267-
}
246+
case BlockImportFailed(error) if informFetcherOnFail =>
247+
fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error))
248+
case BlockEnqueued | DuplicateBlock | UnknownParent | BlockImportFailed(_) => ()
249+
case result => log.error("Unknown block import result {}", result)
268250
}
269251
.map(_ => Running)
270252
},
@@ -279,9 +261,7 @@ class BlockImporter(
279261

280262
private def updateTxPool(blocksAdded: Seq[Block], blocksRemoved: Seq[Block]): Unit = {
281263
blocksRemoved.foreach(block => pendingTransactionsManager ! AddUncheckedTransactions(block.body.transactionList))
282-
blocksAdded.foreach { block =>
283-
pendingTransactionsManager ! RemoveTransactions(block.body.transactionList)
284-
}
264+
blocksAdded.foreach(block => pendingTransactionsManager ! RemoveTransactions(block.body.transactionList))
285265
}
286266

287267
private def importWith(importTask: Task[NewBehavior], blockImportType: BlockImportType)(
@@ -303,7 +283,6 @@ class BlockImporter(
303283
case NewBetterBranch(oldBranch) =>
304284
val transactionsToAdd = oldBranch.flatMap(_.body.transactionList)
305285
pendingTransactionsManager ! PendingTransactionsManager.AddUncheckedTransactions(transactionsToAdd)
306-
307286
// Add first block from branch as an ommer
308287
oldBranch.headOption.map(_.header).foreach(ommersPool ! AddOmmers(_))
309288
Right(blocks.toList)
@@ -312,23 +291,21 @@ class BlockImporter(
312291
ommersPool ! AddOmmers(blocks.head.header)
313292
Right(Nil)
314293
case UnknownBranch =>
315-
val currentBlock = blocks.head.number.min(startingBlockNumber)
294+
val currentBlock = blocks.head.number.min(bestKnownBlockNumber)
316295
val goingBackTo = (currentBlock - syncConfig.branchResolutionRequestSize).max(0)
317296
val msg = s"Unknown branch, going back to block nr $goingBackTo in order to resolve branches"
318-
319297
log.info(msg)
320298
fetcher ! BlockFetcher.InvalidateBlocksFrom(goingBackTo, msg, shouldBlacklist = false)
321299
Left(goingBackTo)
322300
case InvalidBranch =>
323301
val goingBackTo = blocks.head.number
324302
val msg = s"Invalid branch, going back to $goingBackTo"
325-
326303
log.info(msg)
327304
fetcher ! BlockFetcher.InvalidateBlocksFrom(goingBackTo, msg)
328305
Right(Nil)
329306
}
330307

331-
private def startingBlockNumber: BigInt = blockchain.getBestBlockNumber()
308+
private def bestKnownBlockNumber: BigInt = blockchain.getBestBlockNumber()
332309

333310
private def getBehavior(newBehavior: NewBehavior, blockImportType: BlockImportType): Behavior = newBehavior match {
334311
case Running => running
@@ -338,7 +315,6 @@ class BlockImporter(
338315
}
339316

340317
object BlockImporter {
341-
// scalastyle:off parameter.number
342318
def props(
343319
fetcher: ActorRef,
344320
ledger: Ledger,
@@ -367,8 +343,6 @@ object BlockImporter {
367343

368344
sealed trait ImporterMsg
369345
case object Start extends ImporterMsg
370-
case object OnTop extends ImporterMsg
371-
case object NotOnTop extends ImporterMsg
372346
case class MinedBlock(block: Block) extends ImporterMsg
373347
case class NewCheckpoint(block: Block) extends ImporterMsg
374348
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
@@ -402,14 +376,9 @@ object BlockImporter {
402376
}
403377

404378
case class ImporterState(
405-
isOnTop: Boolean,
406379
importing: Boolean,
407380
resolvingBranchFrom: Option[BigInt]
408381
) {
409-
def onTop(): ImporterState = copy(isOnTop = true)
410-
411-
def notOnTop(): ImporterState = copy(isOnTop = false)
412-
413382
def importingBlocks(): ImporterState = copy(importing = true)
414383

415384
def notImportingBlocks(): ImporterState = copy(importing = false)
@@ -423,7 +392,6 @@ object BlockImporter {
423392

424393
object ImporterState {
425394
def initial: ImporterState = ImporterState(
426-
isOnTop = false,
427395
importing = false,
428396
resolvingBranchFrom = None
429397
)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,6 @@ class RegularSync(
6464
fetcher.toClassic,
6565
BlockFetcher.PrintStatus
6666
)(context.dispatcher)
67-
val printImporterSchedule: Cancellable =
68-
scheduler.scheduleWithFixedDelay(
69-
syncConfig.printStatusInterval,
70-
syncConfig.printStatusInterval,
71-
importer,
72-
BlockImporter.PrintStatus
73-
)(context.dispatcher)
7467

7568
override def receive: Receive = running(
7669
ProgressState(startedFetching = false, initialBlock = 0, currentBlock = 0, bestKnownNetworkBlock = 0)
@@ -115,7 +108,6 @@ class RegularSync(
115108
override def postStop(): Unit = {
116109
log.info("Regular Sync stopped")
117110
printFetcherSchedule.cancel()
118-
printImporterSchedule.cancel()
119111
}
120112
}
121113
object RegularSync {

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
121121

122122
// Fetcher should not enqueue any new block
123123
importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref))
124-
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
125124
importer.expectNoMessage(100.millis)
126125
}
127126

@@ -152,7 +151,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
152151
importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref))
153152
}
154153

155-
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
156154
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
157155
blocks.map(_.hash).toList shouldEqual firstBlocksBatch.map(_.hash)
158156
}
@@ -182,7 +180,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
182180

183181
// If we try to pick the whole chain we should only receive the first part
184182
importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref))
185-
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
186183
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
187184
blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash)
188185
}
@@ -243,7 +240,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
243240
)
244241

245242
importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref))
246-
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
247243
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
248244
val headers = blocks.map(_.header).toList
249245

0 commit comments

Comments
 (0)