Skip to content

Commit 2db9ba8

Browse files
committed
ETCM-732: Handle missing state node in regular sync using Either instead of Task.
1 parent 2999dde commit 2db9ba8

File tree

5 files changed

+70
-23
lines changed

5 files changed

+70
-23
lines changed

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import io.iohk.ethereum.ledger.Ledger.BlockResult
2020
import monix.execution.Scheduler
2121
import org.scalamock.scalatest.MockFactory
2222
import org.scalatest.BeforeAndAfterAll
23-
import org.scalatest.concurrent.Eventually.eventually
23+
import org.scalatest.concurrent.Eventually
2424
import org.scalatest.flatspec.AsyncFlatSpecLike
2525
import org.scalatest.matchers.should.Matchers
2626

@@ -31,7 +31,8 @@ class BlockImporterItSpec
3131
with TestSetupWithVmAndValidators
3232
with AsyncFlatSpecLike
3333
with Matchers
34-
with BeforeAndAfterAll {
34+
with BeforeAndAfterAll
35+
with Eventually {
3536

3637
implicit val testScheduler = Scheduler.fixedPool("test", 32)
3738

@@ -151,7 +152,7 @@ class BlockImporterItSpec
151152
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
152153

153154
//because the blocks are not valid, we shouldn't reorganise, but at least stay with a current chain, and the best block of the current chain is oldBlock4
154-
eventually { blockchain.getBestBlock().get shouldEqual oldBlock4 }
155+
eventually {Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual oldBlock4 }
155156
}
156157

157158
it should "return a correct new best block after reorganising longer chain to a shorter one if its weight is bigger" in {
@@ -163,7 +164,7 @@ class BlockImporterItSpec
163164

164165
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
165166

166-
eventually { Thread.sleep(200); blockchain.getBestBlock().get shouldEqual newBlock3 }
167+
eventually { Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual newBlock3 }
167168
}
168169

169170
it should "return Unknown branch, in case of PickedBlocks with block that has a parent that's not in the chain" in {
@@ -190,7 +191,7 @@ class BlockImporterItSpec
190191
//not reorganising anymore until oldBlock4(not part of the chain anymore), no block/ommer validation when not part of the chain, resolveBranch is returning UnknownBranch
191192
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(List(newBlock5ParentOldBlock4)))
192193

193-
eventually { blockchain.getBestBlock().get shouldEqual newBlock4ParentOldBlock3 }
194+
eventually {Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual newBlock4ParentOldBlock3 }
194195
}
195196

196197
it should "switch to a branch with a checkpoint" in {
@@ -203,8 +204,8 @@ class BlockImporterItSpec
203204

204205
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
205206

206-
eventually { blockchain.getBestBlock().get shouldEqual oldBlock5WithCheckpoint }
207-
eventually { blockchain.getLatestCheckpointBlockNumber() shouldEqual oldBlock5WithCheckpoint.header.number }
207+
eventually { Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual oldBlock5WithCheckpoint }
208+
eventually { Thread.sleep(1000); blockchain.getLatestCheckpointBlockNumber() shouldEqual oldBlock5WithCheckpoint.header.number }
208209
}
209210

210211
it should "switch to a branch with a newer checkpoint" in {
@@ -217,8 +218,8 @@ class BlockImporterItSpec
217218

218219
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
219220

220-
eventually { blockchain.getBestBlock().get shouldEqual newBlock4WithCheckpoint }
221-
eventually { blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock4WithCheckpoint.header.number }
221+
eventually {Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual newBlock4WithCheckpoint }
222+
eventually {Thread.sleep(1000); blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock4WithCheckpoint.header.number }
222223
}
223224

224225
it should "return a correct checkpointed block after receiving a request for generating a new checkpoint" in {
@@ -241,4 +242,38 @@ class BlockImporterItSpec
241242
Thread.sleep(1000); blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1
242243
}
243244
}
245+
246+
it should "ask BlockFetcher to resolve missing node" in {
247+
val parent = blockchain.getBestBlock().get
248+
val newBlock: Block = getBlock(genesisBlock.number + 5, difficulty = 104, parent = parent.header.hash)
249+
val invalidBlock = newBlock.copy(header = newBlock.header.copy(beneficiary = Address(111).bytes))
250+
251+
val ledger = new TestLedgerImpl(successValidators)
252+
val blockImporter = system.actorOf(
253+
BlockImporter.props(
254+
fetcherProbe.ref,
255+
ledger,
256+
blockchain,
257+
syncConfig,
258+
ommersPoolProbe.ref,
259+
broadcasterProbe.ref,
260+
pendingTransactionsManagerProbe.ref,
261+
supervisor.ref
262+
)
263+
)
264+
265+
blockImporter ! BlockImporter.Start
266+
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(List(invalidBlock)))
267+
268+
eventually {
269+
Thread.sleep(1000);
270+
val msg = fetcherProbe.fishForMessage(5.seconds) {
271+
case BlockFetcher.FetchStateNode(_) => true
272+
case _ => false
273+
}.asInstanceOf[BlockFetcher.FetchStateNode]
274+
275+
msg.hash.length should be > 0
276+
}
277+
278+
}
244279
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ class BlockImporter(
208208
case DuplicateBlock | BlockEnqueued =>
209209
tryImportBlocks(restOfBlocks, importedBlocks)
210210

211+
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
212+
Task.now((importedBlocks, Some(missingNodeException)))
213+
214+
case BlockImportFailedDueToMissingNode(missingNodeException) =>
215+
Task.raiseError(missingNodeException)
216+
211217
case err @ (UnknownParent | BlockImportFailed(_)) =>
212218
log.error(
213219
"Block {} import failed, with hash {} and parent hash {}",
@@ -217,10 +223,6 @@ class BlockImporter(
217223
)
218224
Task.now((importedBlocks, Some(err)))
219225
}
220-
.onErrorHandle {
221-
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
222-
(importedBlocks, Some(missingNodeEx))
223-
}
224226
}
225227

226228
private def importBlock(
@@ -253,18 +255,18 @@ class BlockImporter(
253255
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally)
254256
case None => ()
255257
}
258+
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
259+
// state node re-download will be handled when downloading headers
260+
doLog(importMessages.missingStateNode(missingNodeException))
261+
Running
262+
case BlockImportFailedDueToMissingNode(missingNodeException) =>
263+
Task.raiseError(missingNodeException)
256264
case BlockImportFailed(error) =>
257265
if (informFetcherOnFail) {
258266
fetcher ! BlockFetcher.BlockImportFailed(block.number, error)
259267
}
260268
}
261269
.map(_ => Running)
262-
.recover {
263-
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
264-
// state node re-download will be handled when downloading headers
265-
doLog(importMessages.missingStateNode(missingNodeEx))
266-
Running
267-
}
268270
},
269271
blockImportType
270272
)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ImportMessages.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ sealed abstract class ImportMessages(block: Block) {
3030
case UnknownParent => orphaned()
3131
case ChainReorganised(_, newBranch, _) => reorganisedChain(newBranch)
3232
case BlockImportFailed(error) => importFailed(error)
33+
case BlockImportFailedDueToMissingNode(reason) => missingStateNode(reason)
3334
}
3435
}
3536

src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package io.iohk.ethereum.ledger
33
import akka.util.ByteString
44
import io.iohk.ethereum.consensus.validators.BlockHeaderError.HeaderParentNotFoundError
55
import io.iohk.ethereum.domain._
6-
import io.iohk.ethereum.ledger.BlockExecutionError.ValidationBeforeExecError
6+
import io.iohk.ethereum.ledger.BlockExecutionError.{MPTError, ValidationBeforeExecError}
77
import io.iohk.ethereum.ledger.BlockQueue.Leaf
8+
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
89
import io.iohk.ethereum.utils.{ByteStringUtils, Logger}
910
import monix.eval.Task
1011
import monix.execution.Scheduler
@@ -57,6 +58,9 @@ class BlockImport(
5758
case None =>
5859
BlockImportedToTop(importedBlocks)
5960

61+
case Some(MPTError(reason)) if reason.isInstanceOf[MissingNodeException] =>
62+
BlockImportFailedDueToMissingNode(reason.asInstanceOf[MissingNodeException])
63+
6064
case Some(error) if importedBlocks.isEmpty =>
6165
blockQueue.removeSubtree(block.header.hash)
6266
BlockImportFailed(error.toString)
@@ -164,7 +168,10 @@ class BlockImport(
164168
reorgResult match {
165169
case Some(execResult) =>
166170
execResult.fold(
167-
err => BlockImportFailed(s"Error while trying to reorganise chain: $err"),
171+
{
172+
case MPTError(reason: MissingNodeException) => BlockImportFailedDueToMissingNode(reason)
173+
case err => BlockImportFailed(s"Error while trying to reorganise chain: $err")
174+
},
168175
ChainReorganised.tupled
169176
)
170177

@@ -274,4 +281,6 @@ case class ChainReorganised(
274281

275282
case class BlockImportFailed(error: String) extends BlockImportResult
276283

284+
case class BlockImportFailedDueToMissingNode(reason: MissingNodeException) extends BlockImportResult
285+
277286
case object UnknownParent extends BlockImportResult

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class RegularSyncSpec
295295
"fetching state node" should {
296296
abstract class MissingStateNodeFixture(system: ActorSystem) extends Fixture(system) {
297297
val failingBlock: Block = testBlocksChunked.head.head
298-
ledger.setImportResult(failingBlock, Task.raiseError(new MissingNodeException(failingBlock.hash)))
298+
ledger.setImportResult(failingBlock, Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
299299
}
300300

301301
"blacklist peer which returns empty response" in sync(new MissingStateNodeFixture(testSystem) {
@@ -366,7 +366,7 @@ class RegularSyncSpec
366366
(ledger
367367
.importBlock(_: Block)(_: Scheduler))
368368
.when(*, *)
369-
.returns(Task.raiseError(new MissingNodeException(failingBlock.hash)))
369+
.returns(Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
370370

371371
var saveNodeWasCalled: Boolean = false
372372
val nodeData = List(ByteString(failingBlock.header.toBytes: Array[Byte]))

0 commit comments

Comments
 (0)