Skip to content

ETCM-732: Handle missing state node in regular sync after fast sync is done #961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@ import io.iohk.ethereum.domain._
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.{Fixtures, Mocks, ObjectGenerators, crypto}
import io.iohk.ethereum.{Fixtures, Mocks, NormalPatience, ObjectGenerators, Timeouts, crypto}
import io.iohk.ethereum.ledger.Ledger.BlockResult
import monix.execution.Scheduler
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.flatspec.AsyncFlatSpecLike
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._

class BlockImporterItSpec
extends MockFactory
with TestSetupWithVmAndValidators
with AsyncFlatSpecLike
with AnyFlatSpecLike
with Matchers
with BeforeAndAfterAll {
with BeforeAndAfterAll
with Eventually
with NormalPatience {

implicit val testScheduler = Scheduler.fixedPool("test", 32)

Expand Down Expand Up @@ -162,7 +164,7 @@ class BlockImporterItSpec

blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))

eventually { Thread.sleep(200); blockchain.getBestBlock().get shouldEqual newBlock3 }
eventually { blockchain.getBestBlock().get shouldEqual newBlock3 }
}

it should "return Unknown branch, in case of PickedBlocks with block that has a parent that's not in the chain" in {
Expand Down Expand Up @@ -235,9 +237,41 @@ class BlockImporterItSpec
val checkpointBlock = checkpointBlockGenerator.generate(newBlock5, Checkpoint(signatures))
blockImporter ! NewCheckpoint(checkpointBlock)

eventually { Thread.sleep(1000); blockchain.getBestBlock().get shouldEqual checkpointBlock }
eventually { blockchain.getBestBlock().get shouldEqual checkpointBlock }
eventually { blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1 }
}

it should "ask BlockFetcher to resolve missing node" in {
val parent = blockchain.getBestBlock().get
val newBlock: Block = getBlock(genesisBlock.number + 5, difficulty = 104, parent = parent.header.hash)
val invalidBlock = newBlock.copy(header = newBlock.header.copy(beneficiary = Address(111).bytes))

val ledger = new TestLedgerImpl(successValidators)
val blockImporter = system.actorOf(
BlockImporter.props(
fetcherProbe.ref,
ledger,
blockchain,
syncConfig,
ommersPoolProbe.ref,
broadcasterProbe.ref,
pendingTransactionsManagerProbe.ref,
supervisor.ref
)
)

blockImporter ! BlockImporter.Start
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(List(invalidBlock)))

eventually {
Thread.sleep(1000); blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1

val msg = fetcherProbe.fishForMessage(Timeouts.longTimeout) {
case BlockFetcher.FetchStateNode(_) => true
case _ => false
}.asInstanceOf[BlockFetcher.FetchStateNode]

msg.hash.length should be > 0
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ class BlockImporter(
case DuplicateBlock | BlockEnqueued =>
tryImportBlocks(restOfBlocks, importedBlocks)

case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
Task.now((importedBlocks, Some(missingNodeException)))

case BlockImportFailedDueToMissingNode(missingNodeException) =>
Task.raiseError(missingNodeException)

case err @ (UnknownParent | BlockImportFailed(_)) =>
log.error(
"Block {} import failed, with hash {} and parent hash {}",
Expand All @@ -217,10 +223,6 @@ class BlockImporter(
)
Task.now((importedBlocks, Some(err)))
}
.onErrorHandle {
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
(importedBlocks, Some(missingNodeEx))
}
}

private def importBlock(
Expand Down Expand Up @@ -253,18 +255,18 @@ class BlockImporter(
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally)
case None => ()
}
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
// state node re-download will be handled when downloading headers
doLog(importMessages.missingStateNode(missingNodeException))
Running
case BlockImportFailedDueToMissingNode(missingNodeException) =>
Task.raiseError(missingNodeException)
case BlockImportFailed(error) =>
if (informFetcherOnFail) {
fetcher ! BlockFetcher.BlockImportFailed(block.number, error)
}
}
.map(_ => Running)
.recover {
case missingNodeEx: MissingNodeException if syncConfig.redownloadMissingStateNodes =>
// state node re-download will be handled when downloading headers
doLog(importMessages.missingStateNode(missingNodeEx))
Running
}
},
blockImportType
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ sealed abstract class ImportMessages(block: Block) {
case UnknownParent => orphaned()
case ChainReorganised(_, newBranch, _) => reorganisedChain(newBranch)
case BlockImportFailed(error) => importFailed(error)
case BlockImportFailedDueToMissingNode(reason) => missingStateNode(reason)
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package io.iohk.ethereum.ledger
import akka.util.ByteString
import io.iohk.ethereum.consensus.validators.BlockHeaderError.HeaderParentNotFoundError
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger.BlockExecutionError.ValidationBeforeExecError
import io.iohk.ethereum.ledger.BlockExecutionError.{MPTError, ValidationBeforeExecError}
import io.iohk.ethereum.ledger.BlockQueue.Leaf
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
import io.iohk.ethereum.utils.{ByteStringUtils, Logger}
import monix.eval.Task
import monix.execution.Scheduler
Expand Down Expand Up @@ -57,6 +58,9 @@ class BlockImport(
case None =>
BlockImportedToTop(importedBlocks)

case Some(MPTError(reason)) if reason.isInstanceOf[MissingNodeException] =>
BlockImportFailedDueToMissingNode(reason.asInstanceOf[MissingNodeException])

case Some(error) if importedBlocks.isEmpty =>
blockQueue.removeSubtree(block.header.hash)
BlockImportFailed(error.toString)
Expand Down Expand Up @@ -164,7 +168,10 @@ class BlockImport(
reorgResult match {
case Some(execResult) =>
execResult.fold(
err => BlockImportFailed(s"Error while trying to reorganise chain: $err"),
{
case MPTError(reason: MissingNodeException) => BlockImportFailedDueToMissingNode(reason)
case err => BlockImportFailed(s"Error while trying to reorganise chain: $err")
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could also be written as:

          {
            case MPTError(reason: MissingNodeException) => BlockImportFailedDueToMissingNode(reason)
            case err => BlockImportFailed(s"Error while trying to reorganise chain: $err")
          },

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ChainReorganised.tupled
)

Expand Down Expand Up @@ -274,4 +281,6 @@ case class ChainReorganised(

case class BlockImportFailed(error: String) extends BlockImportResult

case class BlockImportFailedDueToMissingNode(reason: MissingNodeException) extends BlockImportResult

case object UnknownParent extends BlockImportResult
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class RegularSyncSpec
"fetching state node" should {
abstract class MissingStateNodeFixture(system: ActorSystem) extends Fixture(system) {
val failingBlock: Block = testBlocksChunked.head.head
ledger.setImportResult(failingBlock, Task.raiseError(new MissingNodeException(failingBlock.hash)))
ledger.setImportResult(failingBlock, Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))
}

"blacklist peer which returns empty response" in sync(new MissingStateNodeFixture(testSystem) {
Expand Down Expand Up @@ -366,7 +366,7 @@ class RegularSyncSpec
(ledger
.importBlock(_: Block)(_: Scheduler))
.when(*, *)
.returns(Task.raiseError(new MissingNodeException(failingBlock.hash)))
.returns(Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))))

var saveNodeWasCalled: Boolean = false
val nodeData = List(ByteString(failingBlock.header.toBytes: Array[Byte]))
Expand Down