Skip to content

Commit 98b2915

Browse files
committed
ETCM-732: Handle missing state node in regular sync using Either instead of Task.
1 parent 3ea65c1 commit 98b2915

File tree

5 files changed

+68
-22
lines changed

5 files changed

+68
-22
lines changed

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,25 @@ import io.iohk.ethereum.domain._
1414
import io.iohk.ethereum.mpt.MerklePatriciaTrie
1515
import io.iohk.ethereum.utils.Config.SyncConfig
1616
import io.iohk.ethereum.utils.Config
17-
import io.iohk.ethereum.{Fixtures, Mocks, ObjectGenerators, crypto}
17+
import io.iohk.ethereum.{Fixtures, Mocks, NormalPatience, ObjectGenerators, Timeouts, crypto}
1818
import io.iohk.ethereum.ledger.Ledger.BlockResult
1919
import monix.execution.Scheduler
2020
import org.scalamock.scalatest.MockFactory
2121
import org.scalatest.BeforeAndAfterAll
22-
import org.scalatest.concurrent.Eventually.eventually
23-
import org.scalatest.flatspec.AsyncFlatSpecLike
22+
import org.scalatest.concurrent.Eventually
23+
import org.scalatest.flatspec.AnyFlatSpecLike
2424
import org.scalatest.matchers.should.Matchers
2525

2626
import scala.concurrent.duration._
2727

2828
class BlockImporterItSpec
2929
extends MockFactory
3030
with TestSetupWithVmAndValidators
31-
with AsyncFlatSpecLike
31+
with AnyFlatSpecLike
3232
with Matchers
33-
with BeforeAndAfterAll {
33+
with BeforeAndAfterAll
34+
with Eventually
35+
with NormalPatience {
3436

3537
implicit val testScheduler = Scheduler.fixedPool("test", 32)
3638

@@ -162,7 +164,7 @@ class BlockImporterItSpec
162164

163165
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
164166

165-
eventually { Thread.sleep(200); blockchain.getBestBlock().get shouldEqual newBlock3 }
167+
eventually { blockchain.getBestBlock().get shouldEqual newBlock3 }
166168
}
167169

168170
it should "return Unknown branch, in case of PickedBlocks with block that has a parent that's not in the chain" in {
@@ -235,9 +237,41 @@ class BlockImporterItSpec
235237
val checkpointBlock = checkpointBlockGenerator.generate(newBlock5, Checkpoint(signatures))
236238
blockImporter ! NewCheckpoint(checkpointBlock)
237239

238-
eventually { Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual checkpointBlock }
240+
eventually { blockchain.getBestBlock().get shouldEqual checkpointBlock }
241+
eventually { blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1 }
242+
}
243+
244+
it should "ask BlockFetcher to resolve missing node" in {
245+
val parent = blockchain.getBestBlock().get
246+
val newBlock: Block = getBlock(genesisBlock.number + 5, difficulty = 104, parent = parent.header.hash)
247+
val invalidBlock = newBlock.copy(header = newBlock.header.copy(beneficiary = Address(111).bytes))
248+
249+
val ledger = new TestLedgerImpl(successValidators)
250+
val blockImporter = system.actorOf(
251+
BlockImporter.props(
252+
fetcherProbe.ref,
253+
ledger,
254+
blockchain,
255+
syncConfig,
256+
ommersPoolProbe.ref,
257+
broadcasterProbe.ref,
258+
pendingTransactionsManagerProbe.ref,
259+
supervisor.ref
260+
)
261+
)
262+
263+
blockImporter ! BlockImporter.Start
264+
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(List(invalidBlock)))
265+
239266
eventually {
240-
Thread.sleep(1000); blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1
267+
268+
val msg = fetcherProbe.fishForMessage(Timeouts.longTimeout) {
269+
case BlockFetcher.FetchStateNode(_) => true
270+
case _ => false
271+
}.asInstanceOf[BlockFetcher.FetchStateNode]
272+
273+
msg.hash.length should be > 0
241274
}
275+
242276
}
243277
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ class BlockImporter(
208208
case DuplicateBlock | BlockEnqueued =>
209209
tryImportBlocks(restOfBlocks, importedBlocks)
210210

211+
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
212+
Task.now((importedBlocks, Some(missingNodeException)))
213+
214+
case BlockImportFailedDueToMissingNode(missingNodeException) =>
215+
Task.raiseError(missingNodeException)
216+
211217
case err @ (UnknownParent | BlockImportFailed(_)) =>
212218
log.error(
213219
"Block {} import failed, with hash {} and parent hash {}",
@@ -217,10 +223,6 @@ class BlockImporter(
217223
)
218224
Task.now((importedBlocks, Some(err)))
219225
}
220-
.onErrorHandle {
221-
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
222-
(importedBlocks, Some(missingNodeEx))
223-
}
224226
}
225227

226228
private def importBlock(
@@ -253,18 +255,18 @@ class BlockImporter(
253255
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally)
254256
case None => ()
255257
}
258+
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
259+
// state node re-download will be handled when downloading headers
260+
doLog(importMessages.missingStateNode(missingNodeException))
261+
Running
262+
case BlockImportFailedDueToMissingNode(missingNodeException) =>
263+
Task.raiseError(missingNodeException)
256264
case BlockImportFailed(error) =>
257265
if (informFetcherOnFail) {
258266
fetcher ! BlockFetcher.BlockImportFailed(block.number, error)
259267
}
260268
}
261269
.map(_ => Running)
262-
.recover {
263-
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
264-
// state node re-download will be handled when downloading headers
265-
doLog(importMessages.missingStateNode(missingNodeEx))
266-
Running
267-
}
268270
},
269271
blockImportType
270272
)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ImportMessages.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ sealed abstract class ImportMessages(block: Block) {
3030
case UnknownParent => orphaned()
3131
case ChainReorganised(_, newBranch, _) => reorganisedChain(newBranch)
3232
case BlockImportFailed(error) => importFailed(error)
33+
case BlockImportFailedDueToMissingNode(reason) => missingStateNode(reason)
3334
}
3435
}
3536

src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package io.iohk.ethereum.ledger
33
import akka.util.ByteString
44
import io.iohk.ethereum.consensus.validators.BlockHeaderError.HeaderParentNotFoundError
55
import io.iohk.ethereum.domain._
6-
import io.iohk.ethereum.ledger.BlockExecutionError.ValidationBeforeExecError
6+
import io.iohk.ethereum.ledger.BlockExecutionError.{MPTError, ValidationBeforeExecError}
77
import io.iohk.ethereum.ledger.BlockQueue.Leaf
8+
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
89
import io.iohk.ethereum.utils.{ByteStringUtils, Logger}
910
import monix.eval.Task
1011
import monix.execution.Scheduler
@@ -57,6 +58,9 @@ class BlockImport(
5758
case None =>
5859
BlockImportedToTop(importedBlocks)
5960

61+
case Some(MPTError(reason)) if reason.isInstanceOf[MissingNodeException] =>
62+
BlockImportFailedDueToMissingNode(reason.asInstanceOf[MissingNodeException])
63+
6064
case Some(error) if importedBlocks.isEmpty =>
6165
blockQueue.removeSubtree(block.header.hash)
6266
BlockImportFailed(error.toString)
@@ -164,7 +168,10 @@ class BlockImport(
164168
reorgResult match {
165169
case Some(execResult) =>
166170
execResult.fold(
167-
err => BlockImportFailed(s"Error while trying to reorganise chain: $err"),
171+
{
172+
case MPTError(reason: MissingNodeException) => BlockImportFailedDueToMissingNode(reason)
173+
case err => BlockImportFailed(s"Error while trying to reorganise chain: $err")
174+
},
168175
ChainReorganised.tupled
169176
)
170177

@@ -274,4 +281,6 @@ case class ChainReorganised(
274281

275282
case class BlockImportFailed(error: String) extends BlockImportResult
276283

284+
case class BlockImportFailedDueToMissingNode(reason: MissingNodeException) extends BlockImportResult
285+
277286
case object UnknownParent extends BlockImportResult

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class RegularSyncSpec
295295
"fetching state node" should {
296296
abstract class MissingStateNodeFixture(system: ActorSystem) extends Fixture(system) {
297297
val failingBlock: Block = testBlocksChunked.head.head
298-
ledger.setImportResult(failingBlock, Task.raiseError(new MissingNodeException(failingBlock.hash)))
298+
ledger.setImportResult(failingBlock, Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
299299
}
300300

301301
"blacklist peer which returns empty response" in sync(new MissingStateNodeFixture(testSystem) {
@@ -366,7 +366,7 @@ class RegularSyncSpec
366366
(ledger
367367
.importBlock(_: Block)(_: Scheduler))
368368
.when(*, *)
369-
.returns(Task.raiseError(new MissingNodeException(failingBlock.hash)))
369+
.returns(Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
370370

371371
var saveNodeWasCalled: Boolean = false
372372
val nodeData = List(ByteString(failingBlock.header.toBytes: Array[Byte]))

0 commit comments

Comments
 (0)