Skip to content

[ETCM-377] Fix consistency issue between cache and persisted block number on rollbacks #800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 18, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
val currentWolrd = getMptForBlock(block)
val (newBlock, newWeight, _) = createChildBlock(block, currentWeight, currentWolrd)(updateWorldForBlock)
bl.save(newBlock, Seq(), newWeight, saveAsBestBlock = true)
bl.persistCachedNodes()
broadcastBlock(newBlock, newWeight)
}.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ class FastSync(
val bestReceivedBlock = fullBlocks.maxBy(_.number)
val lastStoredBestBlockNumber = appStateStorage.getBestBlockNumber()
if (lastStoredBestBlockNumber < bestReceivedBlock.number) {
blockchain.saveBestKnownBlocks(bestReceivedBlock.number)
appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
}
syncState = syncState.copy(lastFullBlockNumber = bestReceivedBlock.number.max(lastStoredBestBlockNumber))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
import io.iohk.ethereum.db.cache.{LruCache, MapCache}
import io.iohk.ethereum.db.dataSource.{DataSource, EphemDataSource}
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
import io.iohk.ethereum.db.storage.StateStorage.{FlushSituation, GenesisDataLoad, RollBackFlush}
import io.iohk.ethereum.db.storage.StateStorage.{FlushSituation, GenesisDataLoad}
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
import io.iohk.ethereum.mpt.MptNode
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
Expand Down Expand Up @@ -119,7 +119,6 @@ class CachedReferenceCountedStateStorage(
override def forcePersist(reason: FlushSituation): Boolean = {
reason match {
case GenesisDataLoad => CachedReferenceCountedStorage.persistCache(lruCache, nodeStorage, forced = true)
case RollBackFlush => false
}
}

Expand Down Expand Up @@ -194,7 +193,6 @@ object StateStorage {
}

sealed abstract class FlushSituation
case object RollBackFlush extends FlushSituation
case object GenesisDataLoad extends FlushSituation

}
169 changes: 78 additions & 91 deletions src/main/scala/io/iohk/ethereum/domain/Blockchain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.util.ByteString
import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
import io.iohk.ethereum.db.storage.StateStorage.RollBackFlush
import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
import io.iohk.ethereum.db.storage._
import io.iohk.ethereum.db.storage.pruning.PruningMode
Expand Down Expand Up @@ -223,8 +222,14 @@ class BlockchainImpl(

// There is always only one writer thread (ensured by actor), but can by many readers (api calls)
// to ensure visibility of writes, needs to be volatile or atomic ref
private val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] =
new AtomicReference(BestBlockLatestCheckpointNumbers(BigInt(0), BigInt(0)))
// Laziness required for mocking BlockchainImpl on tests
private lazy val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] =
new AtomicReference(
BestBlockLatestCheckpointNumbers(
appStateStorage.getBestBlockNumber(),
appStateStorage.getLatestCheckpointBlockNumber()
)
)

override def getBlockHeaderByHash(hash: ByteString): Option[BlockHeader] =
blockHeadersStorage.get(hash)
Expand All @@ -246,22 +251,15 @@ class BlockchainImpl(
bestSavedBlockNumber,
bestKnownBlockNumber
)
if (bestKnownBlockNumber > bestSavedBlockNumber)
bestKnownBlockNumber
else
bestSavedBlockNumber
}

override def getLatestCheckpointBlockNumber(): BigInt = {
val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
// The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
// The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
else
latestCheckpointNumberInStorage
// The cached best block number should always be more up-to-date than the one on disk, we are keeping access to disk
// above only for logging purposes
bestKnownBlockNumber
}

override def getLatestCheckpointBlockNumber(): BigInt =
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber

override def getBestBlock(): Block = {
val bestBlockNumber = getBestBlockNumber()
log.debug("Trying to get best block with number {}", bestBlockNumber)
Expand Down Expand Up @@ -294,7 +292,7 @@ class BlockchainImpl(
val currentBestBlockNumber = getBestBlockNumber()
val currentBestCheckpointNumber = getLatestCheckpointBlockNumber()
log.debug(
"Persisting block info data into database. Persisted block number is {}. " +
"Persisting app info data into database. Persisted block number is {}. " +
"Persisted checkpoint number is {}",
currentBestBlockNumber,
currentBestCheckpointNumber
Expand All @@ -313,10 +311,6 @@ class BlockchainImpl(
.and(storeChainWeight(block.header.hash, weight))
.commit()

// not transactional part
// the best blocks data will be persisted only when the cache will be persisted
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)

if (saveAsBestBlock && block.hasCheckpoint) {
log.debug(
"New best known block block number - {}, new best checkpoint number - {}",
Expand All @@ -331,6 +325,10 @@ class BlockchainImpl(
)
saveBestKnownBlock(block.header.number)
}

// not transactional part
// the best blocks data will be persisted only when the cache will be persisted
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
}

override def storeBlockHeader(blockHeader: BlockHeader): DataSourceBatchUpdate = {
Expand Down Expand Up @@ -388,87 +386,83 @@ class BlockchainImpl(
blockNumberMappingStorage.remove(number)
}

// scalastyle:off method.length
override def removeBlock(blockHash: ByteString, withState: Boolean): Unit = {
val maybeBlockHeader = getBlockHeaderByHash(blockHash)
val maybeBlock = getBlockByHash(blockHash)

log.debug(
"Trying to remove block with hash {} and number {}",
ByteStringUtils.hash2string(blockHash),
maybeBlockHeader.map(_.number)
)

val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
val bestBlocks = bestKnownBlockAndLatestCheckpoint.get()
// as we are decreasing block numbers in memory more often than in storage,
// we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber
val bestBlockNumber =
if (bestBlocks.bestBlockNumber != 0) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber()
val latestCheckpointNumber = {
if (bestBlocks.latestCheckpointNumber != 0) bestBlocks.latestCheckpointNumber
else appStateStorage.getLatestCheckpointBlockNumber()
maybeBlock match {
case Some(block) => removeBlock(block, withState)
case None =>
log.warn(s"Attempted removing block with hash ${ByteStringUtils.hash2string(blockHash)} that we don't have")
}
}

val blockNumberMappingUpdates = {
maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)(h =>
if (getHashByBlockNumber(h.number).contains(blockHash))
removeBlockNumberMapping(h.number)
else blockNumberMappingStorage.emptyBatchUpdate
)
}
// scalastyle:off method.length
private def removeBlock(block: Block, withState: Boolean): Unit = {
val blockHash = block.hash

val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate, Option[BigInt]) = maybeBlockHeader match {
case Some(header) =>
if (header.hasCheckpoint && header.number == latestCheckpointNumber) {
val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
prev
.map { num =>
(appStateStorage.putLatestCheckpointBlockNumber(num), Some(num))
}
.getOrElse {
(appStateStorage.removeLatestCheckpointBlockNumber(), Some(0))
}
} else (appStateStorage.emptyBatchUpdate, None)
case None =>
(appStateStorage.emptyBatchUpdate, None)
}
log.debug(s"Trying to remove block block ${block.idTag}")

val newBestBlockNumber: BigInt = if (bestBlockNumber >= 1) bestBlockNumber - 1 else 0
val txList = block.body.transactionList
val bestBlockNumber = getBestBlockNumber()
val latestCheckpointNumber = getLatestCheckpointBlockNumber()

val blockNumberMappingUpdates =
if (getHashByBlockNumber(block.number).contains(blockHash))
removeBlockNumberMapping(block.number)
else blockNumberMappingStorage.emptyBatchUpdate

val newBestBlockNumber: BigInt = (bestBlockNumber - 1).max(0)
val newLatestCheckpointNumber: BigInt =
if (block.hasCheckpoint && block.number == latestCheckpointNumber) {
findPreviousCheckpointBlockNumber(block.number, block.number)
} else latestCheckpointNumber

/*
This two below updates are an exception to the rule of only updating the best blocks when persisting the node
cache.
They are required in case we are removing a block that's marked on db as the best (or as the last checkpoint),
to keep it's consistency, as it will no longer be the best block (nor the last checkpoint).
This updates can't be done if the conditions are false as we might not have the associated mpt nodes, so falling
into the case of having an incomplete best block and so an inconsistent db
*/
val bestBlockNumberUpdates =
if (appStateStorage.getBestBlockNumber() > newBestBlockNumber)
appStateStorage.putBestBlockNumber(newBestBlockNumber)
else appStateStorage.emptyBatchUpdate
val latestCheckpointNumberUpdates =
if (appStateStorage.getLatestCheckpointBlockNumber() > newLatestCheckpointNumber)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I am not sure why we need to update this block in storage when we update it when we update it here:

    if (withState)
      stateStorage.onBlockRollback(block.number, bestBlockNumber) { () => persistBestBlocksData() }

when cache is full or requires flushing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary for consistency's sake

But I wanted to follow the same approach than here, it's also best to keep this value as up-to-date as possible.

Though we have some conditions before to update this values, it could have not been the case if they were false

appStateStorage.putLatestCheckpointBlockNumber(newLatestCheckpointNumber)
else appStateStorage.emptyBatchUpdate

log.debug(
"Persisting app info data into database. Persisted block number is {}. Persisted checkpoint number is {}",
newBestBlockNumber,
newLatestCheckpointNumber
)

blockHeadersStorage
.remove(blockHash)
.and(blockBodiesStorage.remove(blockHash))
.and(chainWeightStorage.remove(blockHash))
.and(receiptStorage.remove(blockHash))
.and(maybeTxList.fold(transactionMappingStorage.emptyBatchUpdate)(removeTxsLocations))
.and(removeTxsLocations(txList))
.and(blockNumberMappingUpdates)
.and(bestBlockNumberUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you are fixing best block number handling when rollbackin, do you think this ugly thing is still necessary in BlockchainImpl:

  //FIXME EC-495 this method should not be need when best block is handled properly during rollback
  def persistCachedNodes(): Unit = {
    if (stateStorage.forcePersist(RollBackFlush)) {
      persistBestBlocksData()
    }
  }

We always call it before rollbacks happen (it is ugly hack from the past, as there was some troubles with cached block number when rollbacking earlier)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm it doesn't seem to be necessary and I'd gladly remove it... though I'm a bit unsure just in case I add further possibilities of introduced issues by changing that

Though if you agree with removing it (or other reviewers do) I'll remove it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, if we can remove it now, let's do it! =)

.and(latestCheckpointNumberUpdates)
.commit()

// not transactional part
saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
saveBestKnownBlocks(newBestBlockNumber, Some(newLatestCheckpointNumber))
log.debug(
"Removed block with hash {}. New best block number - {}, new best checkpoint block number - {}",
ByteStringUtils.hash2string(blockHash),
newBestBlockNumber,
prevCheckpointNumber
newLatestCheckpointNumber
)

maybeBlockHeader.foreach { h =>
if (withState) {
val bestBlocksUpdates = appStateStorage
.putBestBlockNumber(newBestBlockNumber)
.and(checkpointUpdates)
stateStorage.onBlockRollback(h.number, bestBlockNumber) { () =>
log.debug(
"Persisting block info data into database. Persisted block number is {}. " +
"Persisted checkpoint number is {}",
newBestBlockNumber,
prevCheckpointNumber
)
bestBlocksUpdates.commit()
}
}
}
// not transactional part
if (withState)
stateStorage.onBlockRollback(block.number, bestBlockNumber) { () => persistBestBlocksData() }
}
// scalastyle:on method.length

Expand All @@ -485,7 +479,7 @@ class BlockchainImpl(
private def findPreviousCheckpointBlockNumber(
blockNumberToCheck: BigInt,
latestCheckpointBlockNumber: BigInt
): Option[BigInt] = {
): BigInt = {
if (blockNumberToCheck > 0) {
val maybePreviousCheckpointBlockNumber = for {
currentBlock <- getBlockByNumber(blockNumberToCheck)
Expand All @@ -494,10 +488,10 @@ class BlockchainImpl(
} yield currentBlock.number

maybePreviousCheckpointBlockNumber match {
case Some(_) => maybePreviousCheckpointBlockNumber
case Some(previousCheckpointBlockNumber) => previousCheckpointBlockNumber
case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1, latestCheckpointBlockNumber)
}
} else None
} else 0
}

private def saveTxsLocations(blockHash: ByteString, blockBody: BlockBody): DataSourceBatchUpdate =
Expand Down Expand Up @@ -549,13 +543,6 @@ class BlockchainImpl(
noEmptyAccounts = noEmptyAccounts,
ethCompatibleStorage = ethCompatibleStorage
)

//FIXME EC-495 this method should not be need when best block is handled properly during rollback
def persistCachedNodes(): Unit = {
if (stateStorage.forcePersist(RollBackFlush)) {
persistBestBlocksData()
}
}
}

trait BlockchainStorages {
Expand Down
2 changes: 0 additions & 2 deletions src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ class BlockImport(
*/
private def reorganiseChainFromQueue(queuedLeaf: ByteString): BlockImportResult = {
log.debug("Reorganising chain from leaf {}", ByteStringUtils.hash2string(queuedLeaf))
blockchain.persistCachedNodes()
val newBranch = blockQueue.getBranch(queuedLeaf, dequeue = true)
val bestNumber = blockchain.getBestBlockNumber()

Expand Down Expand Up @@ -242,7 +241,6 @@ class BlockImport(
weight <- blockchain.getChainWeightByHash(hash)
} yield BlockData(block, receipts, weight) :: removeBlocksUntil(parent, fromNumber - 1)

// Not updating best block number for efficiency, it will be updated in the callers anyway
blockchain.removeBlock(hash, withState = true)

blockList.getOrElse(Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.util.ByteString
import io.iohk.ethereum.db.cache.{LruCache, MapCache}
import io.iohk.ethereum.db.dataSource.EphemDataSource
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
import io.iohk.ethereum.db.storage.StateStorage.{GenesisDataLoad, RollBackFlush}
import io.iohk.ethereum.db.storage.StateStorage.GenesisDataLoad
import io.iohk.ethereum.db.storage.pruning.InMemoryPruning
import io.iohk.ethereum.mpt.LeafNode
import io.iohk.ethereum.utils.Config.NodeCacheConfig
Expand Down Expand Up @@ -40,7 +40,7 @@ class ReadOnlyNodeStorageSpec extends AnyFlatSpec with Matchers {
dataSource.storage.size shouldEqual 1
}

it should "be able to persist to underlying storage when Genesis loading and not persist durin rollback" in new TestSetup {
it should "be able to persist to underlying storage when Genesis loading" in new TestSetup {
val (nodeKey, nodeVal) = MptStorage.collapseNode(Some(newLeaf))._2.head
val readOnlyNodeStorage = cachedStateStorage.getReadOnlyStorage

Expand All @@ -53,9 +53,6 @@ class ReadOnlyNodeStorageSpec extends AnyFlatSpec with Matchers {

readOnlyNodeStorage.persist()

cachedStateStorage.forcePersist(RollBackFlush) shouldEqual false
dataSource.storage.size shouldEqual 0

cachedStateStorage.forcePersist(GenesisDataLoad) shouldEqual true
dataSource.storage.size shouldEqual 1
}
Expand Down
Loading