Skip to content

Commit 7743570

Browse files
committed
ETCM-732: Handle missing state node in regular sync using Either instead of Task.
1 parent 0a7a3d0 commit 7743570

File tree

5 files changed

+68
-17
lines changed

5 files changed

+68
-17
lines changed

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import cats.data.NonEmptyList
66
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.NewCheckpoint
77
import io.iohk.ethereum.blockchain.sync.regular.{BlockFetcher, BlockImporter}
88
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
9-
import io.iohk.ethereum.consensus.{GetBlockHeaderByHash, GetNBlocksBack}
109
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
1110
import io.iohk.ethereum.consensus.ethash.validators.{OmmersValidator, StdOmmersValidator}
1211
import io.iohk.ethereum.consensus.validators.Validators
12+
import io.iohk.ethereum.consensus.{GetBlockHeaderByHash, GetNBlocksBack}
1313
import io.iohk.ethereum.domain._
14+
import io.iohk.ethereum.ledger.Ledger.BlockResult
1415
import io.iohk.ethereum.mpt.MerklePatriciaTrie
15-
import io.iohk.ethereum.utils.Config.SyncConfig
1616
import io.iohk.ethereum.utils.Config
17+
import io.iohk.ethereum.utils.Config.SyncConfig
1718
import io.iohk.ethereum.{Fixtures, Mocks, ObjectGenerators, crypto}
18-
import io.iohk.ethereum.ledger.Ledger.BlockResult
1919
import monix.execution.Scheduler
2020
import org.scalamock.scalatest.MockFactory
2121
import org.scalatest.BeforeAndAfterAll
@@ -240,4 +240,39 @@ class BlockImporterItSpec
240240
Thread.sleep(1000); blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1
241241
}
242242
}
243+
244+
it should "ask BlockFetcher to resolve missing node" in {
245+
val parent = blockchain.getBestBlock().get
246+
val newBlock: Block = getBlock(genesisBlock.number + 5, difficulty = 104, parent = parent.header.hash)
247+
val invalidBlock = newBlock.copy(header = newBlock.header.copy(beneficiary = Address(111).bytes))
248+
249+
val ledger = new TestLedgerImpl(successValidators)
250+
val blockImporter = system.actorOf(
251+
BlockImporter.props(
252+
fetcherProbe.ref,
253+
ledger,
254+
blockchain,
255+
syncConfig,
256+
ommersPoolProbe.ref,
257+
broadcasterProbe.ref,
258+
pendingTransactionsManagerProbe.ref,
259+
supervisor.ref
260+
)
261+
)
262+
263+
blockImporter ! BlockImporter.Start
264+
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(List(invalidBlock)))
265+
266+
eventually {
267+
Thread.sleep(1000)
268+
269+
val msg = fetcherProbe.fishForMessage() {
270+
case BlockFetcher.FetchStateNode(_) => true
271+
case _ => false
272+
}.asInstanceOf[BlockFetcher.FetchStateNode]
273+
274+
msg.hash.length should be > 0
275+
}
276+
277+
}
243278
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,16 @@ class BlockImporter(
206206
case DuplicateBlock | BlockEnqueued =>
207207
tryImportBlocks(restOfBlocks, importedBlocks)
208208

209+
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
210+
Task.now((importedBlocks, Some(missingNodeException)))
211+
212+
case BlockImportFailedDueToMissingNode(missingNodeException) =>
213+
Task.raiseError(missingNodeException)
214+
209215
case err @ (UnknownParent | BlockImportFailed(_)) =>
210216
log.error("Block {} import failed", blocks.head.number)
211217
Task.now((importedBlocks, Some(err)))
212218
}
213-
.onErrorHandle {
214-
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
215-
(importedBlocks, Some(missingNodeEx))
216-
}
217219
}
218220

219221
private def importBlock(
@@ -246,18 +248,18 @@ class BlockImporter(
246248
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally)
247249
case None => ()
248250
}
251+
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
252+
// state node re-download will be handled when downloading headers
253+
doLog(importMessages.missingStateNode(missingNodeException))
254+
Running
255+
case BlockImportFailedDueToMissingNode(missingNodeException) =>
256+
Task.raiseError(missingNodeException)
249257
case BlockImportFailed(error) =>
250258
if (informFetcherOnFail) {
251259
fetcher ! BlockFetcher.BlockImportFailed(block.number, error)
252260
}
253261
}
254262
.map(_ => Running)
255-
.recover {
256-
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
257-
// state node re-download will be handled when downloading headers
258-
doLog(importMessages.missingStateNode(missingNodeEx))
259-
Running
260-
}
261263
},
262264
blockImportType
263265
)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ImportMessages.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ sealed abstract class ImportMessages(block: Block) {
3030
case UnknownParent => orphaned()
3131
case ChainReorganised(_, newBranch, _) => reorganisedChain(newBranch)
3232
case BlockImportFailed(error) => importFailed(error)
33+
case BlockImportFailedDueToMissingNode(reason) => missingStateNode(reason)
3334
}
3435
}
3536

src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package io.iohk.ethereum.ledger
33
import akka.util.ByteString
44
import io.iohk.ethereum.consensus.validators.BlockHeaderError.HeaderParentNotFoundError
55
import io.iohk.ethereum.domain._
6-
import io.iohk.ethereum.ledger.BlockExecutionError.ValidationBeforeExecError
6+
import io.iohk.ethereum.ledger.BlockExecutionError.{MPTError, ValidationBeforeExecError}
77
import io.iohk.ethereum.ledger.BlockQueue.Leaf
8+
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
89
import io.iohk.ethereum.utils.{ByteStringUtils, Logger}
910
import monix.eval.Task
1011
import monix.execution.Scheduler
@@ -54,6 +55,9 @@ class BlockImport(
5455
case None =>
5556
BlockImportedToTop(importedBlocks)
5657

58+
case Some(MPTError(reason)) if reason.isInstanceOf[MissingNodeException] =>
59+
BlockImportFailedDueToMissingNode(reason.asInstanceOf[MissingNodeException])
60+
5761
case Some(error) if importedBlocks.isEmpty =>
5862
blockQueue.removeSubtree(block.header.hash)
5963
BlockImportFailed(error.toString)
@@ -161,7 +165,11 @@ class BlockImport(
161165
reorgResult match {
162166
case Some(execResult) =>
163167
execResult.fold(
164-
err => BlockImportFailed(s"Error while trying to reorganise chain: $err"),
168+
{
169+
case MPTError(reason) if reason.isInstanceOf[MissingNodeException] =>
170+
BlockImportFailedDueToMissingNode(reason.asInstanceOf[MissingNodeException])
171+
case err => BlockImportFailed(s"Error while trying to reorganise chain: $err")
172+
},
165173
ChainReorganised.tupled
166174
)
167175

@@ -180,6 +188,9 @@ class BlockImport(
180188
case None =>
181189
Right(oldBlocksData.map(_.block), executedBlocks.map(_.block), executedBlocks.map(_.weight))
182190

191+
case Some(MPTError(reason)) if reason.isInstanceOf[MissingNodeException] =>
192+
Left(MPTError(reason))
193+
183194
case Some(error) =>
184195
revertChainReorganisation(newBranch, oldBlocksData, executedBlocks)
185196
Left(error)
@@ -271,4 +282,6 @@ case class ChainReorganised(
271282

272283
case class BlockImportFailed(error: String) extends BlockImportResult
273284

285+
case class BlockImportFailedDueToMissingNode(reason: MissingNodeException) extends BlockImportResult
286+
274287
case object UnknownParent extends BlockImportResult

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class RegularSyncSpec
295295
"fetching state node" should {
296296
abstract class MissingStateNodeFixture(system: ActorSystem) extends Fixture(system) {
297297
val failingBlock: Block = testBlocksChunked.head.head
298-
ledger.setImportResult(failingBlock, Task.raiseError(new MissingNodeException(failingBlock.hash)))
298+
ledger.setImportResult(failingBlock, Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
299299
}
300300

301301
"blacklist peer which returns empty response" in sync(new MissingStateNodeFixture(testSystem) {
@@ -366,7 +366,7 @@ class RegularSyncSpec
366366
(ledger
367367
.importBlock(_: Block)(_: Scheduler))
368368
.when(*, *)
369-
.returns(Task.raiseError(new MissingNodeException(failingBlock.hash)))
369+
.returns(Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
370370

371371
var saveNodeWasCalled: Boolean = false
372372
val nodeData = List(ByteString(failingBlock.header.toBytes: Array[Byte]))

0 commit comments

Comments
 (0)