Skip to content

Checkpoint fixes #930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 1, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
AwaitingHeadersToBeIgnored
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NewCheckpointBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain._
Expand Down Expand Up @@ -241,12 +241,16 @@ class BlockFetcher(
state.tryInsertBlock(block, peerId) match {
case Left(_) if block.number <= state.lastBlock =>
log.debug(
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} is older than current last block ${state.lastBlock}"
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} is older than current last block ${state.lastBlock}" +
s" - clearing the queues and putting checkpoint to ready blocks queue"
)
state.importer ! NewCheckpointBlock(block, peerId)
val newState = state
.clearQueues()
.enqueueReadyBlock(block, peerId)
fetchBlocks(newState)
case Left(_) if block.number <= state.knownTop =>
log.debug(
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting new top"
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting possible new top"
)
val newState = state
.clearQueues()
Expand Down Expand Up @@ -274,8 +278,7 @@ class BlockFetcher(
//keep fetcher state updated in case new checkpoint block or mined block was imported
case InternalLastBlockImport(blockNr) =>
log.debug(s"New last block $blockNr imported from the inside")
val newLastBlock = blockNr.max(state.lastBlock)
val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr)
val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr)

fetchBlocks(newState)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class BlockFetcherState(
}

/**
* Validates received headers consistency and their compatibilty with the state
* Validates received headers consistency and their compatibility with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
*/
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
Expand All @@ -123,11 +123,19 @@ case class BlockFetcherState(
headers
.asRight[String]
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
.ensure("Given headers should form a sequence with ready blocks")(checkConsistencyWithReadyBlocks)
.ensure("Given headers do not form a chain with already stored ones")(headers =>
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
)
}

private def checkConsistencyWithReadyBlocks(headers: Seq[BlockHeader]): Boolean = {
(readyBlocks, headers) match {
case (_ :+ last, head +: _) if waitingHeaders.isEmpty => last.header isParentOf head
case _ => true
}
}

def validateNewBlockHashes(hashes: Seq[BlockHash]): Either[String, Seq[BlockHash]] =
hashes
.asRight[String]
Expand Down Expand Up @@ -190,18 +198,21 @@ case class BlockFetcherState(
def enqueueRequestedBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
waitingHeaders.dequeueOption
.map { case (waitingHeader, waitingHeadersTail) =>
if (waitingHeader.hash == block.hash)
withPeerForBlocks(fromPeer, Seq(block.number))
if (waitingHeader.hash == block.hash) {
enqueueReadyBlock(block, fromPeer)
.withPossibleNewTopAt(block.number)
.copy(
readyBlocks = readyBlocks.enqueue(block),
waitingHeaders = waitingHeadersTail
)
else
} else
this
}
.getOrElse(this)

def enqueueReadyBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
withPeerForBlocks(fromPeer, Seq(block.number))
.copy(readyBlocks = readyBlocks.enqueue(block))

def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =
if (readyBlocks.nonEmpty) {
val (picked, rest) = readyBlocks.splitAt(amount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ class BlockImporter(
}
}

case NewCheckpointBlock(block, peerId) => importNewBlock(block, peerId, state)

case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)

case ImportDone(newBehavior, importType) =>
Expand Down Expand Up @@ -389,7 +387,6 @@ object BlockImporter {
case object NotOnTop extends ImporterMsg
case class MinedBlock(block: Block) extends ImporterMsg
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
case class NewCheckpointBlock(block: Block, peerId: PeerId) extends ImporterMsg
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
case class ImportDone(newBehavior: NewBehavior, blockImportType: BlockImportType) extends ImporterMsg
case object PickBlocks extends ImporterMsg
Expand Down
36 changes: 21 additions & 15 deletions src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import monix.eval.Task
import monix.execution.Scheduler
import org.bouncycastle.util.encoders.Hex

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext

class BlockImport(
Expand Down Expand Up @@ -152,7 +153,7 @@ class BlockImport(
bestNumber,
ByteStringUtils.hash2string(parentHash)
)
val oldBlocksData = removeBlocksUntil(parentHash, bestNumber).reverse
val oldBlocksData = removeBlocksUntil(parentHash, bestNumber)
oldBlocksData.foreach(block => blockQueue.enqueueBlock(block.block))
handleBlockExecResult(newBranch, parentWeight, oldBlocksData)
}
Expand Down Expand Up @@ -226,26 +227,31 @@ class BlockImport(
* @return the list of removed blocks along with receipts and total difficulties
*/
private def removeBlocksUntil(parent: ByteString, fromNumber: BigInt): List[BlockData] = {
blockchain.getBlockByNumber(fromNumber) match {
case Some(block) if block.header.hash == parent || fromNumber == 0 =>
Nil
@tailrec
def removeBlocksUntil(parent: ByteString, fromNumber: BigInt, acc: List[BlockData]): List[BlockData] = {
blockchain.getBlockByNumber(fromNumber) match {
case Some(block) if block.header.hash == parent || fromNumber == 0 =>
acc

case Some(block) =>
val hash = block.header.hash
case Some(block) =>
val hash = block.header.hash

val blockList = for {
receipts <- blockchain.getReceiptsByHash(hash)
weight <- blockchain.getChainWeightByHash(hash)
} yield BlockData(block, receipts, weight) :: removeBlocksUntil(parent, fromNumber - 1)
val blockDataOpt = for {
receipts <- blockchain.getReceiptsByHash(hash)
weight <- blockchain.getChainWeightByHash(hash)
} yield BlockData(block, receipts, weight)

blockchain.removeBlock(hash, withState = true)
blockchain.removeBlock(hash, withState = true)

blockList.getOrElse(Nil)
removeBlocksUntil(parent, fromNumber - 1, blockDataOpt.map(_ :: acc).getOrElse(acc))

case None =>
log.error(s"Unexpected missing block number: $fromNumber")
Nil
case None =>
log.error(s"Unexpected missing block number: $fromNumber")
acc
}
}

removeBlocksUntil(parent, fromNumber, Nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ class BlockFetcherSpec
}
}

"should inform importer when checkpoint block is older than last block" in new TestSetup {
"should put checkpoint to ready blocks when checkpoint block is older than last block" in new TestSetup {
startFetcher()

triggerFetching(10)
Expand Down Expand Up @@ -469,7 +469,17 @@ class BlockFetcherSpec

importer.expectMsg(BlockImporter.OnTop)

importer.expectMsg(BlockImporter.NewCheckpointBlock(checkpointBlock, fakePeer.id))
// We need to wait a while in order to allow fetcher to process all the blocks
system.scheduler.scheduleOnce(Timeouts.shortTimeout) {
importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize))
}

importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
val headers = blocks.map(_.header).toList

assert(HeadersSeq.areChain(headers))
assert(headers.contains(checkpointBlock.header))
}
}

"should properly handle a request timeout" in new TestSetup {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ class BlockFetcherStateSpec
}
assert(result.map(_.knownTop) === Right(blocks.last.number))
}

"enqueue requested blocks fails when ready blocks is not forming a sequence with given headers" in {

val result = BlockFetcherState
.initial(importer, validators.blockValidator, 0)
.copy(readyBlocks = Queue(blocks.head))
.appendHeaders(blocks.map(_.header))
.map(_.handleRequestedBlocks(blocks, peer))

assert(result.map(_.waitingHeaders) === Left("Given headers should form a sequence with ready blocks"))
}
}

"trying to insert block into the queues" should {
Expand Down