Skip to content

Commit e52683f

Browse files
authored
Merge pull request #930 from input-output-hk/checkpoint-fixes
Checkpoint fixes
2 parents 649f372 + 819821f commit e52683f

File tree

6 files changed

+69
-31
lines changed

6 files changed

+69
-31
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
1313
AwaitingBodiesToBeIgnored,
1414
AwaitingHeadersToBeIgnored
1515
}
16-
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NewCheckpointBlock, NotOnTop, OnTop}
16+
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
1717
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
1818
import io.iohk.ethereum.crypto.kec256
1919
import io.iohk.ethereum.domain._
@@ -241,12 +241,16 @@ class BlockFetcher(
241241
state.tryInsertBlock(block, peerId) match {
242242
case Left(_) if block.number <= state.lastBlock =>
243243
log.debug(
244-
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} is older than current last block ${state.lastBlock}"
244+
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} is older than current last block ${state.lastBlock}" +
245+
s" - clearing the queues and putting checkpoint to ready blocks queue"
245246
)
246-
state.importer ! NewCheckpointBlock(block, peerId)
247+
val newState = state
248+
.clearQueues()
249+
.enqueueReadyBlock(block, peerId)
250+
fetchBlocks(newState)
247251
case Left(_) if block.number <= state.knownTop =>
248252
log.debug(
249-
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting new top"
253+
s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting possible new top"
250254
)
251255
val newState = state
252256
.clearQueues()
@@ -274,8 +278,7 @@ class BlockFetcher(
274278
//keep fetcher state updated in case new checkpoint block or mined block was imported
275279
case InternalLastBlockImport(blockNr) =>
276280
log.debug(s"New last block $blockNr imported from the inside")
277-
val newLastBlock = blockNr.max(state.lastBlock)
278-
val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr)
281+
val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr)
279282

280283
fetchBlocks(newState)
281284
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ case class BlockFetcherState(
113113
}
114114

115115
/**
116-
* Validates received headers consistency and their compatibilty with the state
116+
* Validates received headers consistency and their compatibility with the state
117117
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
118118
*/
119119
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
@@ -123,11 +123,19 @@ case class BlockFetcherState(
123123
headers
124124
.asRight[String]
125125
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
126+
.ensure("Given headers should form a sequence with ready blocks")(checkConsistencyWithReadyBlocks)
126127
.ensure("Given headers do not form a chain with already stored ones")(headers =>
127128
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
128129
)
129130
}
130131

132+
private def checkConsistencyWithReadyBlocks(headers: Seq[BlockHeader]): Boolean = {
133+
(readyBlocks, headers) match {
134+
case (_ :+ last, head +: _) if waitingHeaders.isEmpty => last.header isParentOf head
135+
case _ => true
136+
}
137+
}
138+
131139
def validateNewBlockHashes(hashes: Seq[BlockHash]): Either[String, Seq[BlockHash]] =
132140
hashes
133141
.asRight[String]
@@ -190,18 +198,21 @@ case class BlockFetcherState(
190198
def enqueueRequestedBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
191199
waitingHeaders.dequeueOption
192200
.map { case (waitingHeader, waitingHeadersTail) =>
193-
if (waitingHeader.hash == block.hash)
194-
withPeerForBlocks(fromPeer, Seq(block.number))
201+
if (waitingHeader.hash == block.hash) {
202+
enqueueReadyBlock(block, fromPeer)
195203
.withPossibleNewTopAt(block.number)
196204
.copy(
197-
readyBlocks = readyBlocks.enqueue(block),
198205
waitingHeaders = waitingHeadersTail
199206
)
200-
else
207+
} else
201208
this
202209
}
203210
.getOrElse(this)
204211

212+
def enqueueReadyBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
213+
withPeerForBlocks(fromPeer, Seq(block.number))
214+
.copy(readyBlocks = readyBlocks.enqueue(block))
215+
205216
def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =
206217
if (readyBlocks.nonEmpty) {
207218
val (picked, rest) = readyBlocks.splitAt(amount)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ class BlockImporter(
9191
}
9292
}
9393

94-
case NewCheckpointBlock(block, peerId) => importNewBlock(block, peerId, state)
95-
9694
case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)
9795

9896
case ImportDone(newBehavior, importType) =>
@@ -389,7 +387,6 @@ object BlockImporter {
389387
case object NotOnTop extends ImporterMsg
390388
case class MinedBlock(block: Block) extends ImporterMsg
391389
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
392-
case class NewCheckpointBlock(block: Block, peerId: PeerId) extends ImporterMsg
393390
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
394391
case class ImportDone(newBehavior: NewBehavior, blockImportType: BlockImportType) extends ImporterMsg
395392
case object PickBlocks extends ImporterMsg

src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import monix.eval.Task
1010
import monix.execution.Scheduler
1111
import org.bouncycastle.util.encoders.Hex
1212

13+
import scala.annotation.tailrec
1314
import scala.concurrent.ExecutionContext
1415

1516
class BlockImport(
@@ -152,7 +153,7 @@ class BlockImport(
152153
bestNumber,
153154
ByteStringUtils.hash2string(parentHash)
154155
)
155-
val oldBlocksData = removeBlocksUntil(parentHash, bestNumber).reverse
156+
val oldBlocksData = removeBlocksUntil(parentHash, bestNumber)
156157
oldBlocksData.foreach(block => blockQueue.enqueueBlock(block.block))
157158
handleBlockExecResult(newBranch, parentWeight, oldBlocksData)
158159
}
@@ -226,26 +227,31 @@ class BlockImport(
226227
* @return the list of removed blocks along with receipts and total difficulties
227228
*/
228229
private def removeBlocksUntil(parent: ByteString, fromNumber: BigInt): List[BlockData] = {
229-
blockchain.getBlockByNumber(fromNumber) match {
230-
case Some(block) if block.header.hash == parent || fromNumber == 0 =>
231-
Nil
230+
@tailrec
231+
def removeBlocksUntil(parent: ByteString, fromNumber: BigInt, acc: List[BlockData]): List[BlockData] = {
232+
blockchain.getBlockByNumber(fromNumber) match {
233+
case Some(block) if block.header.hash == parent || fromNumber == 0 =>
234+
acc
232235

233-
case Some(block) =>
234-
val hash = block.header.hash
236+
case Some(block) =>
237+
val hash = block.header.hash
235238

236-
val blockList = for {
237-
receipts <- blockchain.getReceiptsByHash(hash)
238-
weight <- blockchain.getChainWeightByHash(hash)
239-
} yield BlockData(block, receipts, weight) :: removeBlocksUntil(parent, fromNumber - 1)
239+
val blockDataOpt = for {
240+
receipts <- blockchain.getReceiptsByHash(hash)
241+
weight <- blockchain.getChainWeightByHash(hash)
242+
} yield BlockData(block, receipts, weight)
240243

241-
blockchain.removeBlock(hash, withState = true)
244+
blockchain.removeBlock(hash, withState = true)
242245

243-
blockList.getOrElse(Nil)
246+
removeBlocksUntil(parent, fromNumber - 1, blockDataOpt.map(_ :: acc).getOrElse(acc))
244247

245-
case None =>
246-
log.error(s"Unexpected missing block number: $fromNumber")
247-
Nil
248+
case None =>
249+
log.error(s"Unexpected missing block number: $fromNumber")
250+
acc
251+
}
248252
}
253+
254+
removeBlocksUntil(parent, fromNumber, Nil)
249255
}
250256
}
251257

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ class BlockFetcherSpec
434434
}
435435
}
436436

437-
"should inform importer when checkpoint block is older than last block" in new TestSetup {
437+
"should put checkpoint to ready blocks when checkpoint block is older than last block" in new TestSetup {
438438
startFetcher()
439439

440440
triggerFetching(10)
@@ -469,7 +469,17 @@ class BlockFetcherSpec
469469

470470
importer.expectMsg(BlockImporter.OnTop)
471471

472-
importer.expectMsg(BlockImporter.NewCheckpointBlock(checkpointBlock, fakePeer.id))
472+
// We need to wait a while in order to allow fetcher to process all the blocks
473+
system.scheduler.scheduleOnce(Timeouts.shortTimeout) {
474+
importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize))
475+
}
476+
477+
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
478+
val headers = blocks.map(_.header).toList
479+
480+
assert(HeadersSeq.areChain(headers))
481+
assert(headers.contains(checkpointBlock.header))
482+
}
473483
}
474484

475485
"should properly handle a request timeout" in new TestSetup {

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ class BlockFetcherStateSpec
6060
}
6161
assert(result.map(_.knownTop) === Right(blocks.last.number))
6262
}
63+
64+
"enqueue requested blocks fails when ready blocks is not forming a sequence with given headers" in {
65+
66+
val result = BlockFetcherState
67+
.initial(importer, validators.blockValidator, 0)
68+
.copy(readyBlocks = Queue(blocks.head))
69+
.appendHeaders(blocks.map(_.header))
70+
.map(_.handleRequestedBlocks(blocks, peer))
71+
72+
assert(result.map(_.waitingHeaders) === Left("Given headers should form a sequence with ready blocks"))
73+
}
6374
}
6475

6576
"trying to insert block into the queues" should {

0 commit comments

Comments
 (0)