Skip to content

Commit dcb6d7b

Browse files
author
Nicolás Tallar
authored
[ETCM-377] Fix consistency issue between cache and persisted block number on rollbacks (#800)
1 parent d6251f6 commit dcb6d7b

File tree

7 files changed

+192
-104
lines changed

7 files changed

+192
-104
lines changed

src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,6 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
294294
val currentWolrd = getMptForBlock(block)
295295
val (newBlock, newWeight, _) = createChildBlock(block, currentWeight, currentWolrd)(updateWorldForBlock)
296296
bl.save(newBlock, Seq(), newWeight, saveAsBestBlock = true)
297-
bl.persistCachedNodes()
298297
broadcastBlock(newBlock, newWeight)
299298
}.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock))
300299
}

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ class FastSync(
787787
val bestReceivedBlock = fullBlocks.maxBy(_.number)
788788
val lastStoredBestBlockNumber = appStateStorage.getBestBlockNumber()
789789
if (lastStoredBestBlockNumber < bestReceivedBlock.number) {
790+
blockchain.saveBestKnownBlocks(bestReceivedBlock.number)
790791
appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
791792
}
792793
syncState = syncState.copy(lastFullBlockNumber = bestReceivedBlock.number.max(lastStoredBestBlockNumber))

src/main/scala/io/iohk/ethereum/db/storage/StateStorage.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
55
import io.iohk.ethereum.db.cache.{LruCache, MapCache}
66
import io.iohk.ethereum.db.dataSource.{DataSource, EphemDataSource}
77
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
8-
import io.iohk.ethereum.db.storage.StateStorage.{FlushSituation, GenesisDataLoad, RollBackFlush}
8+
import io.iohk.ethereum.db.storage.StateStorage.{FlushSituation, GenesisDataLoad}
99
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
1010
import io.iohk.ethereum.mpt.MptNode
1111
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
@@ -119,7 +119,6 @@ class CachedReferenceCountedStateStorage(
119119
override def forcePersist(reason: FlushSituation): Boolean = {
120120
reason match {
121121
case GenesisDataLoad => CachedReferenceCountedStorage.persistCache(lruCache, nodeStorage, forced = true)
122-
case RollBackFlush => false
123122
}
124123
}
125124

@@ -194,7 +193,6 @@ object StateStorage {
194193
}
195194

196195
sealed abstract class FlushSituation
197-
case object RollBackFlush extends FlushSituation
198196
case object GenesisDataLoad extends FlushSituation
199197

200198
}

src/main/scala/io/iohk/ethereum/domain/Blockchain.scala

Lines changed: 78 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import akka.util.ByteString
66
import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate
77
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError
88
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
9-
import io.iohk.ethereum.db.storage.StateStorage.RollBackFlush
109
import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
1110
import io.iohk.ethereum.db.storage._
1211
import io.iohk.ethereum.db.storage.pruning.PruningMode
@@ -223,8 +222,14 @@ class BlockchainImpl(
223222

224223
// There is always only one writer thread (ensured by actor), but can by many readers (api calls)
225224
// to ensure visibility of writes, needs to be volatile or atomic ref
226-
private val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] =
227-
new AtomicReference(BestBlockLatestCheckpointNumbers(BigInt(0), BigInt(0)))
225+
// Laziness required for mocking BlockchainImpl on tests
226+
private lazy val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] =
227+
new AtomicReference(
228+
BestBlockLatestCheckpointNumbers(
229+
appStateStorage.getBestBlockNumber(),
230+
appStateStorage.getLatestCheckpointBlockNumber()
231+
)
232+
)
228233

229234
override def getBlockHeaderByHash(hash: ByteString): Option[BlockHeader] =
230235
blockHeadersStorage.get(hash)
@@ -246,22 +251,15 @@ class BlockchainImpl(
246251
bestSavedBlockNumber,
247252
bestKnownBlockNumber
248253
)
249-
if (bestKnownBlockNumber > bestSavedBlockNumber)
250-
bestKnownBlockNumber
251-
else
252-
bestSavedBlockNumber
253-
}
254254

255-
override def getLatestCheckpointBlockNumber(): BigInt = {
256-
val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
257-
// The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
258-
// The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
259-
if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
260-
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
261-
else
262-
latestCheckpointNumberInStorage
255+
// The cached best block number should always be more up-to-date than the one on disk, we are keeping access to disk
256+
// above only for logging purposes
257+
bestKnownBlockNumber
263258
}
264259

260+
override def getLatestCheckpointBlockNumber(): BigInt =
261+
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
262+
265263
override def getBestBlock(): Block = {
266264
val bestBlockNumber = getBestBlockNumber()
267265
log.debug("Trying to get best block with number {}", bestBlockNumber)
@@ -294,7 +292,7 @@ class BlockchainImpl(
294292
val currentBestBlockNumber = getBestBlockNumber()
295293
val currentBestCheckpointNumber = getLatestCheckpointBlockNumber()
296294
log.debug(
297-
"Persisting block info data into database. Persisted block number is {}. " +
295+
"Persisting app info data into database. Persisted block number is {}. " +
298296
"Persisted checkpoint number is {}",
299297
currentBestBlockNumber,
300298
currentBestCheckpointNumber
@@ -313,10 +311,6 @@ class BlockchainImpl(
313311
.and(storeChainWeight(block.header.hash, weight))
314312
.commit()
315313

316-
// not transactional part
317-
// the best blocks data will be persisted only when the cache will be persisted
318-
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
319-
320314
if (saveAsBestBlock && block.hasCheckpoint) {
321315
log.debug(
322316
"New best known block block number - {}, new best checkpoint number - {}",
@@ -331,6 +325,10 @@ class BlockchainImpl(
331325
)
332326
saveBestKnownBlock(block.header.number)
333327
}
328+
329+
// not transactional part
330+
// the best blocks data will be persisted only when the cache will be persisted
331+
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
334332
}
335333

336334
override def storeBlockHeader(blockHeader: BlockHeader): DataSourceBatchUpdate = {
@@ -388,87 +386,83 @@ class BlockchainImpl(
388386
blockNumberMappingStorage.remove(number)
389387
}
390388

391-
// scalastyle:off method.length
392389
override def removeBlock(blockHash: ByteString, withState: Boolean): Unit = {
393-
val maybeBlockHeader = getBlockHeaderByHash(blockHash)
390+
val maybeBlock = getBlockByHash(blockHash)
394391

395-
log.debug(
396-
"Trying to remove block with hash {} and number {}",
397-
ByteStringUtils.hash2string(blockHash),
398-
maybeBlockHeader.map(_.number)
399-
)
400-
401-
val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
402-
val bestBlocks = bestKnownBlockAndLatestCheckpoint.get()
403-
// as we are decreasing block numbers in memory more often than in storage,
404-
// we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber
405-
val bestBlockNumber =
406-
if (bestBlocks.bestBlockNumber != 0) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber()
407-
val latestCheckpointNumber = {
408-
if (bestBlocks.latestCheckpointNumber != 0) bestBlocks.latestCheckpointNumber
409-
else appStateStorage.getLatestCheckpointBlockNumber()
392+
maybeBlock match {
393+
case Some(block) => removeBlock(block, withState)
394+
case None =>
395+
log.warn(s"Attempted removing block with hash ${ByteStringUtils.hash2string(blockHash)} that we don't have")
410396
}
397+
}
411398

412-
val blockNumberMappingUpdates = {
413-
maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)(h =>
414-
if (getHashByBlockNumber(h.number).contains(blockHash))
415-
removeBlockNumberMapping(h.number)
416-
else blockNumberMappingStorage.emptyBatchUpdate
417-
)
418-
}
399+
// scalastyle:off method.length
400+
private def removeBlock(block: Block, withState: Boolean): Unit = {
401+
val blockHash = block.hash
419402

420-
val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate, Option[BigInt]) = maybeBlockHeader match {
421-
case Some(header) =>
422-
if (header.hasCheckpoint && header.number == latestCheckpointNumber) {
423-
val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
424-
prev
425-
.map { num =>
426-
(appStateStorage.putLatestCheckpointBlockNumber(num), Some(num))
427-
}
428-
.getOrElse {
429-
(appStateStorage.removeLatestCheckpointBlockNumber(), Some(0))
430-
}
431-
} else (appStateStorage.emptyBatchUpdate, None)
432-
case None =>
433-
(appStateStorage.emptyBatchUpdate, None)
434-
}
403+
log.debug(s"Trying to remove block block ${block.idTag}")
435404

436-
val newBestBlockNumber: BigInt = if (bestBlockNumber >= 1) bestBlockNumber - 1 else 0
405+
val txList = block.body.transactionList
406+
val bestBlockNumber = getBestBlockNumber()
407+
val latestCheckpointNumber = getLatestCheckpointBlockNumber()
408+
409+
val blockNumberMappingUpdates =
410+
if (getHashByBlockNumber(block.number).contains(blockHash))
411+
removeBlockNumberMapping(block.number)
412+
else blockNumberMappingStorage.emptyBatchUpdate
413+
414+
val newBestBlockNumber: BigInt = (bestBlockNumber - 1).max(0)
415+
val newLatestCheckpointNumber: BigInt =
416+
if (block.hasCheckpoint && block.number == latestCheckpointNumber) {
417+
findPreviousCheckpointBlockNumber(block.number, block.number)
418+
} else latestCheckpointNumber
419+
420+
/*
421+
This two below updates are an exception to the rule of only updating the best blocks when persisting the node
422+
cache.
423+
They are required in case we are removing a block that's marked on db as the best (or as the last checkpoint),
424+
to keep it's consistency, as it will no longer be the best block (nor the last checkpoint).
425+
426+
This updates can't be done if the conditions are false as we might not have the associated mpt nodes, so falling
427+
into the case of having an incomplete best block and so an inconsistent db
428+
*/
429+
val bestBlockNumberUpdates =
430+
if (appStateStorage.getBestBlockNumber() > newBestBlockNumber)
431+
appStateStorage.putBestBlockNumber(newBestBlockNumber)
432+
else appStateStorage.emptyBatchUpdate
433+
val latestCheckpointNumberUpdates =
434+
if (appStateStorage.getLatestCheckpointBlockNumber() > newLatestCheckpointNumber)
435+
appStateStorage.putLatestCheckpointBlockNumber(newLatestCheckpointNumber)
436+
else appStateStorage.emptyBatchUpdate
437+
438+
log.debug(
439+
"Persisting app info data into database. Persisted block number is {}. Persisted checkpoint number is {}",
440+
newBestBlockNumber,
441+
newLatestCheckpointNumber
442+
)
437443

438444
blockHeadersStorage
439445
.remove(blockHash)
440446
.and(blockBodiesStorage.remove(blockHash))
441447
.and(chainWeightStorage.remove(blockHash))
442448
.and(receiptStorage.remove(blockHash))
443-
.and(maybeTxList.fold(transactionMappingStorage.emptyBatchUpdate)(removeTxsLocations))
449+
.and(removeTxsLocations(txList))
444450
.and(blockNumberMappingUpdates)
451+
.and(bestBlockNumberUpdates)
452+
.and(latestCheckpointNumberUpdates)
445453
.commit()
446454

447-
// not transactional part
448-
saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
455+
saveBestKnownBlocks(newBestBlockNumber, Some(newLatestCheckpointNumber))
449456
log.debug(
450457
"Removed block with hash {}. New best block number - {}, new best checkpoint block number - {}",
451458
ByteStringUtils.hash2string(blockHash),
452459
newBestBlockNumber,
453-
prevCheckpointNumber
460+
newLatestCheckpointNumber
454461
)
455462

456-
maybeBlockHeader.foreach { h =>
457-
if (withState) {
458-
val bestBlocksUpdates = appStateStorage
459-
.putBestBlockNumber(newBestBlockNumber)
460-
.and(checkpointUpdates)
461-
stateStorage.onBlockRollback(h.number, bestBlockNumber) { () =>
462-
log.debug(
463-
"Persisting block info data into database. Persisted block number is {}. " +
464-
"Persisted checkpoint number is {}",
465-
newBestBlockNumber,
466-
prevCheckpointNumber
467-
)
468-
bestBlocksUpdates.commit()
469-
}
470-
}
471-
}
463+
// not transactional part
464+
if (withState)
465+
stateStorage.onBlockRollback(block.number, bestBlockNumber) { () => persistBestBlocksData() }
472466
}
473467
// scalastyle:on method.length
474468

@@ -485,7 +479,7 @@ class BlockchainImpl(
485479
private def findPreviousCheckpointBlockNumber(
486480
blockNumberToCheck: BigInt,
487481
latestCheckpointBlockNumber: BigInt
488-
): Option[BigInt] = {
482+
): BigInt = {
489483
if (blockNumberToCheck > 0) {
490484
val maybePreviousCheckpointBlockNumber = for {
491485
currentBlock <- getBlockByNumber(blockNumberToCheck)
@@ -494,10 +488,10 @@ class BlockchainImpl(
494488
} yield currentBlock.number
495489

496490
maybePreviousCheckpointBlockNumber match {
497-
case Some(_) => maybePreviousCheckpointBlockNumber
491+
case Some(previousCheckpointBlockNumber) => previousCheckpointBlockNumber
498492
case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1, latestCheckpointBlockNumber)
499493
}
500-
} else None
494+
} else 0
501495
}
502496

503497
private def saveTxsLocations(blockHash: ByteString, blockBody: BlockBody): DataSourceBatchUpdate =
@@ -549,13 +543,6 @@ class BlockchainImpl(
549543
noEmptyAccounts = noEmptyAccounts,
550544
ethCompatibleStorage = ethCompatibleStorage
551545
)
552-
553-
//FIXME EC-495 this method should not be need when best block is handled properly during rollback
554-
def persistCachedNodes(): Unit = {
555-
if (stateStorage.forcePersist(RollBackFlush)) {
556-
persistBestBlocksData()
557-
}
558-
}
559546
}
560547

561548
trait BlockchainStorages {

src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ class BlockImport(
142142
*/
143143
private def reorganiseChainFromQueue(queuedLeaf: ByteString): BlockImportResult = {
144144
log.debug("Reorganising chain from leaf {}", ByteStringUtils.hash2string(queuedLeaf))
145-
blockchain.persistCachedNodes()
146145
val newBranch = blockQueue.getBranch(queuedLeaf, dequeue = true)
147146
val bestNumber = blockchain.getBestBlockNumber()
148147

@@ -242,7 +241,6 @@ class BlockImport(
242241
weight <- blockchain.getChainWeightByHash(hash)
243242
} yield BlockData(block, receipts, weight) :: removeBlocksUntil(parent, fromNumber - 1)
244243

245-
// Not updating best block number for efficiency, it will be updated in the callers anyway
246244
blockchain.removeBlock(hash, withState = true)
247245

248246
blockList.getOrElse(Nil)

src/test/scala/io/iohk/ethereum/db/storage/ReadOnlyNodeStorageSpec.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import akka.util.ByteString
66
import io.iohk.ethereum.db.cache.{LruCache, MapCache}
77
import io.iohk.ethereum.db.dataSource.EphemDataSource
88
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
9-
import io.iohk.ethereum.db.storage.StateStorage.{GenesisDataLoad, RollBackFlush}
9+
import io.iohk.ethereum.db.storage.StateStorage.GenesisDataLoad
1010
import io.iohk.ethereum.db.storage.pruning.InMemoryPruning
1111
import io.iohk.ethereum.mpt.LeafNode
1212
import io.iohk.ethereum.utils.Config.NodeCacheConfig
@@ -40,7 +40,7 @@ class ReadOnlyNodeStorageSpec extends AnyFlatSpec with Matchers {
4040
dataSource.storage.size shouldEqual 1
4141
}
4242

43-
it should "be able to persist to underlying storage when Genesis loading and not persist durin rollback" in new TestSetup {
43+
it should "be able to persist to underlying storage when Genesis loading" in new TestSetup {
4444
val (nodeKey, nodeVal) = MptStorage.collapseNode(Some(newLeaf))._2.head
4545
val readOnlyNodeStorage = cachedStateStorage.getReadOnlyStorage
4646

@@ -53,9 +53,6 @@ class ReadOnlyNodeStorageSpec extends AnyFlatSpec with Matchers {
5353

5454
readOnlyNodeStorage.persist()
5555

56-
cachedStateStorage.forcePersist(RollBackFlush) shouldEqual false
57-
dataSource.storage.size shouldEqual 0
58-
5956
cachedStateStorage.forcePersist(GenesisDataLoad) shouldEqual true
6057
dataSource.storage.size shouldEqual 1
6158
}

0 commit comments

Comments
 (0)