Skip to content

Commit f6e7cb0

Browse files
author
Nicolas Tallar
committed
[ETCM-377] Fix consistency issue between cache and persisted block number on rollbacks
1 parent ac691fd commit f6e7cb0

File tree

2 files changed

+178
-82
lines changed

2 files changed

+178
-82
lines changed

src/main/scala/io/iohk/ethereum/domain/Blockchain.scala

Lines changed: 68 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,12 @@ class BlockchainImpl(
224224
// There is always only one writer thread (ensured by actor), but can by many readers (api calls)
225225
// to ensure visibility of writes, needs to be volatile or atomic ref
226226
private val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] =
227-
new AtomicReference(BestBlockLatestCheckpointNumbers(BigInt(0), BigInt(0)))
227+
new AtomicReference(
228+
BestBlockLatestCheckpointNumbers(
229+
appStateStorage.getBestBlockNumber(),
230+
appStateStorage.getLatestCheckpointBlockNumber()
231+
)
232+
)
228233

229234
override def getBlockHeaderByHash(hash: ByteString): Option[BlockHeader] =
230235
blockHeadersStorage.get(hash)
@@ -246,22 +251,15 @@ class BlockchainImpl(
246251
bestSavedBlockNumber,
247252
bestKnownBlockNumber
248253
)
249-
if (bestKnownBlockNumber > bestSavedBlockNumber)
250-
bestKnownBlockNumber
251-
else
252-
bestSavedBlockNumber
253-
}
254254

255-
override def getLatestCheckpointBlockNumber(): BigInt = {
256-
val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
257-
// The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
258-
// The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
259-
if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
260-
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
261-
else
262-
latestCheckpointNumberInStorage
255+
// The cached best block number should always be more up-to-date than the one on disk, we are keeping access to disk
256+
// above only for logging purposes
257+
bestKnownBlockNumber
263258
}
264259

260+
override def getLatestCheckpointBlockNumber(): BigInt =
261+
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
262+
265263
override def getBestBlock(): Block = {
266264
val bestBlockNumber = getBestBlockNumber()
267265
log.debug("Trying to get best block with number {}", bestBlockNumber)
@@ -313,10 +311,6 @@ class BlockchainImpl(
313311
.and(storeChainWeight(block.header.hash, weight))
314312
.commit()
315313

316-
// not transactional part
317-
// the best blocks data will be persisted only when the cache will be persisted
318-
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
319-
320314
if (saveAsBestBlock && block.hasCheckpoint) {
321315
log.debug(
322316
"New best known block block number - {}, new best checkpoint number - {}",
@@ -331,6 +325,10 @@ class BlockchainImpl(
331325
)
332326
saveBestKnownBlock(block.header.number)
333327
}
328+
329+
// not transactional part
330+
// the best blocks data will be persisted only when the cache will be persisted
331+
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
334332
}
335333

336334
override def storeBlockHeader(blockHeader: BlockHeader): DataSourceBatchUpdate = {
@@ -388,87 +386,77 @@ class BlockchainImpl(
388386
blockNumberMappingStorage.remove(number)
389387
}
390388

391-
// scalastyle:off method.length
392389
override def removeBlock(blockHash: ByteString, withState: Boolean): Unit = {
393-
val maybeBlockHeader = getBlockHeaderByHash(blockHash)
394-
395-
log.debug(
396-
"Trying to remove block with hash {} and number {}",
397-
ByteStringUtils.hash2string(blockHash),
398-
maybeBlockHeader.map(_.number)
399-
)
390+
val maybeBlock = getBlockByHash(blockHash)
400391

401-
val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
402-
val bestBlocks = bestKnownBlockAndLatestCheckpoint.get()
403-
// as we are decreasing block numbers in memory more often than in storage,
404-
// we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber
405-
val bestBlockNumber =
406-
if (bestBlocks.bestBlockNumber != 0) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber()
407-
val latestCheckpointNumber = {
408-
if (bestBlocks.latestCheckpointNumber != 0) bestBlocks.latestCheckpointNumber
409-
else appStateStorage.getLatestCheckpointBlockNumber()
392+
maybeBlock match {
393+
case Some(block) => removeBlock(block, withState)
394+
case None =>
395+
log.warn(s"Attempted removing block with hash $blockHash that we don't have")
410396
}
397+
}
411398

412-
val blockNumberMappingUpdates = {
413-
maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)(h =>
414-
if (getHashByBlockNumber(h.number).contains(blockHash))
415-
removeBlockNumberMapping(h.number)
416-
else blockNumberMappingStorage.emptyBatchUpdate
417-
)
418-
}
399+
// scalastyle:off method.length
400+
private def removeBlock(block: Block, withState: Boolean): Unit = {
401+
val blockHash = block.hash
419402

420-
val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate, Option[BigInt]) = maybeBlockHeader match {
421-
case Some(header) =>
422-
if (header.hasCheckpoint && header.number == latestCheckpointNumber) {
423-
val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
424-
prev
425-
.map { num =>
426-
(appStateStorage.putLatestCheckpointBlockNumber(num), Some(num))
427-
}
428-
.getOrElse {
429-
(appStateStorage.removeLatestCheckpointBlockNumber(), Some(0))
430-
}
431-
} else (appStateStorage.emptyBatchUpdate, None)
432-
case None =>
433-
(appStateStorage.emptyBatchUpdate, None)
434-
}
403+
log.debug(s"Trying to remove block block ${block.idTag}")
435404

436-
val newBestBlockNumber: BigInt = if (bestBlockNumber >= 1) bestBlockNumber - 1 else 0
405+
val txList = block.body.transactionList
406+
val bestBlockNumber = getBestBlockNumber()
407+
val latestCheckpointNumber = getLatestCheckpointBlockNumber()
408+
409+
val blockNumberMappingUpdates =
410+
if (getHashByBlockNumber(block.number).contains(blockHash))
411+
removeBlockNumberMapping(block.number)
412+
else blockNumberMappingStorage.emptyBatchUpdate
413+
414+
val newBestBlockNumber: BigInt = (bestBlockNumber - 1).max(0)
415+
val newLatestCheckpointNumber: BigInt =
416+
if (block.hasCheckpoint && block.number == latestCheckpointNumber) {
417+
findPreviousCheckpointBlockNumber(block.number, block.number)
418+
} else latestCheckpointNumber
419+
420+
// Block number updates are only done if the persisted value is larger, as we won't have the associated mpt nodes if not
421+
val bestBlockNumberUpdates =
422+
if (appStateStorage.getBestBlockNumber() > newBestBlockNumber)
423+
appStateStorage.putBestBlockNumber(newBestBlockNumber)
424+
else appStateStorage.emptyBatchUpdate
425+
426+
// Checkpoint number updates are only done if the persisted value is larger, as we won't have the associated mpt nodes if not
427+
val latestCheckpointNumberUpdates =
428+
if (appStateStorage.getLatestCheckpointBlockNumber() > newLatestCheckpointNumber)
429+
appStateStorage.putLatestCheckpointBlockNumber(newLatestCheckpointNumber)
430+
else appStateStorage.emptyBatchUpdate
431+
432+
log.debug(
433+
"Persisting block info data into database. Persisted block number is {}. Persisted checkpoint number is {}",
434+
newBestBlockNumber,
435+
newLatestCheckpointNumber
436+
)
437437

438438
blockHeadersStorage
439439
.remove(blockHash)
440440
.and(blockBodiesStorage.remove(blockHash))
441441
.and(chainWeightStorage.remove(blockHash))
442442
.and(receiptStorage.remove(blockHash))
443-
.and(maybeTxList.fold(transactionMappingStorage.emptyBatchUpdate)(removeTxsLocations))
443+
.and(removeTxsLocations(txList))
444444
.and(blockNumberMappingUpdates)
445+
.and(bestBlockNumberUpdates)
446+
.and(latestCheckpointNumberUpdates)
445447
.commit()
446448

447449
// not transactional part
448-
saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
450+
saveBestKnownBlocks(newBestBlockNumber, Some(newLatestCheckpointNumber))
449451
log.debug(
450452
"Removed block with hash {}. New best block number - {}, new best checkpoint block number - {}",
451453
ByteStringUtils.hash2string(blockHash),
452454
newBestBlockNumber,
453-
prevCheckpointNumber
455+
newLatestCheckpointNumber
454456
)
455457

456-
maybeBlockHeader.foreach { h =>
457-
if (withState) {
458-
val bestBlocksUpdates = appStateStorage
459-
.putBestBlockNumber(newBestBlockNumber)
460-
.and(checkpointUpdates)
461-
stateStorage.onBlockRollback(h.number, bestBlockNumber) { () =>
462-
log.debug(
463-
"Persisting block info data into database. Persisted block number is {}. " +
464-
"Persisted checkpoint number is {}",
465-
newBestBlockNumber,
466-
prevCheckpointNumber
467-
)
468-
bestBlocksUpdates.commit()
469-
}
470-
}
471-
}
458+
if (withState)
459+
stateStorage.onBlockRollback(block.number, bestBlockNumber) { () => persistBestBlocksData() }
472460
}
473461
// scalastyle:on method.length
474462

@@ -485,7 +473,7 @@ class BlockchainImpl(
485473
private def findPreviousCheckpointBlockNumber(
486474
blockNumberToCheck: BigInt,
487475
latestCheckpointBlockNumber: BigInt
488-
): Option[BigInt] = {
476+
): BigInt = {
489477
if (blockNumberToCheck > 0) {
490478
val maybePreviousCheckpointBlockNumber = for {
491479
currentBlock <- getBlockByNumber(blockNumberToCheck)
@@ -494,10 +482,10 @@ class BlockchainImpl(
494482
} yield currentBlock.number
495483

496484
maybePreviousCheckpointBlockNumber match {
497-
case Some(_) => maybePreviousCheckpointBlockNumber
485+
case Some(previousCheckpointBlockNumber) => previousCheckpointBlockNumber
498486
case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1, latestCheckpointBlockNumber)
499487
}
500-
} else None
488+
} else 0
501489
}
502490

503491
private def saveTxsLocations(blockHash: ByteString, blockBody: BlockBody): DataSourceBatchUpdate =

src/test/scala/io/iohk/ethereum/domain/BlockchainSpec.scala

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ import io.iohk.ethereum.db.dataSource.EphemDataSource
77
import io.iohk.ethereum.db.storage.StateStorage
88
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefPostEcip1097
99
import io.iohk.ethereum.mpt.MerklePatriciaTrie
10-
import io.iohk.ethereum.{Fixtures, ObjectGenerators}
10+
import io.iohk.ethereum.{BlockHelpers, Fixtures, ObjectGenerators}
11+
import io.iohk.ethereum.ObjectGenerators._
12+
import org.scalacheck.Gen
13+
import org.scalamock.scalatest.MockFactory
1114
import org.scalatest.flatspec.AnyFlatSpec
1215
import org.scalatest.matchers.should.Matchers
16+
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
1317

14-
class BlockchainSpec extends AnyFlatSpec with Matchers {
18+
class BlockchainSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyChecks {
1519

1620
val checkpoint = ObjectGenerators.fakeCheckpointGen(2, 5).sample.get
1721
val checkpointBlockGenerator = new CheckpointBlockGenerator
@@ -127,4 +131,108 @@ class BlockchainSpec extends AnyFlatSpec with Matchers {
127131
val retrievedAccount = blockchain.getAccount(address, headerWithAcc.number)
128132
retrievedAccount shouldEqual Some(account)
129133
}
134+
135+
it should "return correct best block number after applying and rollbacking blocks" in new TestSetup {
136+
forAll(intGen(min = 1: Int, max = maxNumberBlocksToImport)) { numberBlocksToImport =>
137+
val testSetup = newSetup()
138+
import testSetup._
139+
140+
// Import blocks
141+
val blocksToImport = BlockHelpers.generateChain(numberBlocksToImport, Fixtures.Blocks.Genesis.block)
142+
143+
// Randomly select the block import to persist (empty means no persistance)
144+
val blockImportToPersist = Gen.option(Gen.oneOf(blocksToImport)).sample.get
145+
(stubStateStorage
146+
.onBlockSave(_: BigInt, _: BigInt)(_: () => Unit))
147+
.when(*, *, *)
148+
.onCall((bn, _, persistFn) => {
149+
if (blockImportToPersist.exists(_.number == bn)) persistFn()
150+
})
151+
152+
blocksToImport.foreach { block =>
153+
blockchainWithStubPersisting.save(block, Nil, ChainWeight.zero, true)
154+
}
155+
156+
blockchainWithStubPersisting.getBestBlockNumber() shouldBe blocksToImport.last.number
157+
blockchainStoragesWithStubPersisting.appStateStorage.getBestBlockNumber() shouldBe blockImportToPersist.fold(0: BigInt)(_.number)
158+
159+
160+
// Rollback blocks
161+
val numberBlocksToRollback = intGen(0, numberBlocksToImport).sample.get
162+
val (blocksNotRollbacked, blocksToRollback) = blocksToImport.splitAt(numberBlocksToRollback)
163+
164+
// Randomly select the block rollback to persist (empty means no persistance)
165+
val blockRollbackToPersist = if (blocksToRollback.isEmpty) None else Gen.option(Gen.oneOf(blocksToRollback)).sample.get
166+
(stubStateStorage
167+
.onBlockRollback(_: BigInt, _: BigInt)(_: () => Unit))
168+
.when(*, *, *)
169+
.onCall((bn, _, persistFn) => {
170+
if (blockRollbackToPersist.exists(_.number == bn)) persistFn()
171+
})
172+
173+
blocksToRollback.reverse.foreach { block =>
174+
blockchainWithStubPersisting.removeBlock(block.hash, true)
175+
}
176+
177+
val expectedMemoryBestBlock = blocksNotRollbacked.lastOption.fold(0: BigInt)(_.number)
178+
val expectedPersistedBestBlock = calculatePersistedBestBlock(
179+
blockImportToPersist.map(_.number),
180+
blockRollbackToPersist.map(_.number),
181+
blocksToRollback.map(_.number)
182+
)
183+
blockchainWithStubPersisting.getBestBlockNumber() shouldBe expectedMemoryBestBlock
184+
blockchainStoragesWithStubPersisting.appStateStorage.getBestBlockNumber() shouldBe expectedPersistedBestBlock
185+
}
186+
}
187+
188+
trait TestSetup extends MockFactory {
189+
val maxNumberBlocksToImport: Int = 30
190+
191+
def calculatePersistedBestBlock(blockImportPersisted: Option[BigInt], blockRollbackPersisted: Option[BigInt], blocksRollbacked: Seq[BigInt]): BigInt = {
192+
(blocksRollbacked, blockImportPersisted) match {
193+
case (Nil, Some(bi)) =>
194+
// No blocks rollbacked, last persist was the persist during import
195+
bi
196+
case (nonEmptyRollbackedBlocks, Some(bi)) =>
197+
// Last forced persist during apply/rollback
198+
val maxForcedPersist = blockRollbackPersisted.fold(bi){ br => (br - 1).max(bi)}
199+
200+
// The above number would have been decreased by any rollbacked blocks
201+
(nonEmptyRollbackedBlocks.head - 1).min(maxForcedPersist)
202+
case (_, None) =>
203+
// If persisted rollback, then it was decreased by the future rollbacks, if not no persistance was ever done
204+
blockRollbackPersisted.fold(0: BigInt)(_ => blocksRollbacked.head - 1)
205+
}
206+
}
207+
208+
trait StubPersistingBlockchainSetup {
209+
def stubStateStorage: StateStorage
210+
def blockchainStoragesWithStubPersisting: BlockchainStorages
211+
def blockchainWithStubPersisting: BlockchainImpl
212+
}
213+
214+
def newSetup(): StubPersistingBlockchainSetup = {
215+
new StubPersistingBlockchainSetup with EphemBlockchainTestSetup {
216+
override val stubStateStorage = stub[StateStorage]
217+
override val blockchainStoragesWithStubPersisting = new BlockchainStorages {
218+
val blockHeadersStorage = storagesInstance.storages.blockHeadersStorage
219+
val blockBodiesStorage = storagesInstance.storages.blockBodiesStorage
220+
val blockNumberMappingStorage = storagesInstance.storages.blockNumberMappingStorage
221+
val receiptStorage = storagesInstance.storages.receiptStorage
222+
val evmCodeStorage = storagesInstance.storages.evmCodeStorage
223+
val chainWeightStorage = storagesInstance.storages.chainWeightStorage
224+
val transactionMappingStorage = storagesInstance.storages.transactionMappingStorage
225+
val nodeStorage = storagesInstance.storages.nodeStorage
226+
val pruningMode = storagesInstance.storages.pruningMode
227+
val appStateStorage = storagesInstance.storages.appStateStorage
228+
val cachedNodeStorage = storagesInstance.storages.cachedNodeStorage
229+
val stateStorage = stubStateStorage
230+
}
231+
override val blockchainWithStubPersisting = BlockchainImpl(blockchainStoragesWithStubPersisting)
232+
233+
blockchainWithStubPersisting.storeBlock(Fixtures.Blocks.Genesis.block)
234+
}
235+
}
236+
237+
}
130238
}

0 commit comments

Comments
 (0)