-
Notifications
You must be signed in to change notification settings - Fork 75
[ETCM-377] Fix consistency issue between cache and persisted block number on rollbacks #800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f6e7cb0
12e6cfa
7b2953e
6353484
f498403
084835e
f4b6504
b03d6ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ import akka.util.ByteString | |
import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate | ||
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError | ||
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash} | ||
import io.iohk.ethereum.db.storage.StateStorage.RollBackFlush | ||
import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation | ||
import io.iohk.ethereum.db.storage._ | ||
import io.iohk.ethereum.db.storage.pruning.PruningMode | ||
|
@@ -223,8 +222,14 @@ class BlockchainImpl( | |
|
||
// There is always only one writer thread (ensured by actor), but can by many readers (api calls) | ||
// to ensure visibility of writes, needs to be volatile or atomic ref | ||
private val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] = | ||
new AtomicReference(BestBlockLatestCheckpointNumbers(BigInt(0), BigInt(0))) | ||
// Laziness required for mocking BlockchainImpl on tests | ||
private lazy val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] = | ||
new AtomicReference( | ||
BestBlockLatestCheckpointNumbers( | ||
appStateStorage.getBestBlockNumber(), | ||
appStateStorage.getLatestCheckpointBlockNumber() | ||
) | ||
) | ||
|
||
override def getBlockHeaderByHash(hash: ByteString): Option[BlockHeader] = | ||
blockHeadersStorage.get(hash) | ||
|
@@ -246,22 +251,15 @@ class BlockchainImpl( | |
bestSavedBlockNumber, | ||
bestKnownBlockNumber | ||
) | ||
if (bestKnownBlockNumber > bestSavedBlockNumber) | ||
bestKnownBlockNumber | ||
else | ||
bestSavedBlockNumber | ||
} | ||
|
||
override def getLatestCheckpointBlockNumber(): BigInt = { | ||
val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber() | ||
// The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache. | ||
// The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet | ||
if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage) | ||
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber | ||
else | ||
latestCheckpointNumberInStorage | ||
// The cached best block number should always be more up-to-date than the one on disk, we are keeping access to disk | ||
// above only for logging purposes | ||
bestKnownBlockNumber | ||
} | ||
|
||
override def getLatestCheckpointBlockNumber(): BigInt = | ||
bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber | ||
|
||
override def getBestBlock(): Block = { | ||
val bestBlockNumber = getBestBlockNumber() | ||
log.debug("Trying to get best block with number {}", bestBlockNumber) | ||
|
@@ -294,7 +292,7 @@ class BlockchainImpl( | |
val currentBestBlockNumber = getBestBlockNumber() | ||
val currentBestCheckpointNumber = getLatestCheckpointBlockNumber() | ||
log.debug( | ||
"Persisting block info data into database. Persisted block number is {}. " + | ||
"Persisting app info data into database. Persisted block number is {}. " + | ||
"Persisted checkpoint number is {}", | ||
currentBestBlockNumber, | ||
currentBestCheckpointNumber | ||
|
@@ -313,10 +311,6 @@ class BlockchainImpl( | |
.and(storeChainWeight(block.header.hash, weight)) | ||
.commit() | ||
|
||
// not transactional part | ||
// the best blocks data will be persisted only when the cache will be persisted | ||
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData) | ||
|
||
if (saveAsBestBlock && block.hasCheckpoint) { | ||
log.debug( | ||
"New best known block block number - {}, new best checkpoint number - {}", | ||
|
@@ -331,6 +325,10 @@ class BlockchainImpl( | |
) | ||
saveBestKnownBlock(block.header.number) | ||
} | ||
|
||
// not transactional part | ||
// the best blocks data will be persisted only when the cache will be persisted | ||
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData) | ||
} | ||
|
||
override def storeBlockHeader(blockHeader: BlockHeader): DataSourceBatchUpdate = { | ||
|
@@ -388,87 +386,83 @@ class BlockchainImpl( | |
blockNumberMappingStorage.remove(number) | ||
} | ||
|
||
// scalastyle:off method.length | ||
override def removeBlock(blockHash: ByteString, withState: Boolean): Unit = { | ||
val maybeBlockHeader = getBlockHeaderByHash(blockHash) | ||
val maybeBlock = getBlockByHash(blockHash) | ||
|
||
log.debug( | ||
"Trying to remove block with hash {} and number {}", | ||
ByteStringUtils.hash2string(blockHash), | ||
maybeBlockHeader.map(_.number) | ||
) | ||
|
||
val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList) | ||
val bestBlocks = bestKnownBlockAndLatestCheckpoint.get() | ||
// as we are decreasing block numbers in memory more often than in storage, | ||
// we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber | ||
val bestBlockNumber = | ||
if (bestBlocks.bestBlockNumber != 0) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber() | ||
val latestCheckpointNumber = { | ||
if (bestBlocks.latestCheckpointNumber != 0) bestBlocks.latestCheckpointNumber | ||
else appStateStorage.getLatestCheckpointBlockNumber() | ||
maybeBlock match { | ||
case Some(block) => removeBlock(block, withState) | ||
case None => | ||
log.warn(s"Attempted removing block with hash ${ByteStringUtils.hash2string(blockHash)} that we don't have") | ||
} | ||
} | ||
|
||
val blockNumberMappingUpdates = { | ||
maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)(h => | ||
if (getHashByBlockNumber(h.number).contains(blockHash)) | ||
removeBlockNumberMapping(h.number) | ||
else blockNumberMappingStorage.emptyBatchUpdate | ||
) | ||
} | ||
// scalastyle:off method.length | ||
private def removeBlock(block: Block, withState: Boolean): Unit = { | ||
val blockHash = block.hash | ||
|
||
val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate, Option[BigInt]) = maybeBlockHeader match { | ||
case Some(header) => | ||
if (header.hasCheckpoint && header.number == latestCheckpointNumber) { | ||
val prev = findPreviousCheckpointBlockNumber(header.number, header.number) | ||
prev | ||
.map { num => | ||
(appStateStorage.putLatestCheckpointBlockNumber(num), Some(num)) | ||
} | ||
.getOrElse { | ||
(appStateStorage.removeLatestCheckpointBlockNumber(), Some(0)) | ||
} | ||
} else (appStateStorage.emptyBatchUpdate, None) | ||
case None => | ||
(appStateStorage.emptyBatchUpdate, None) | ||
} | ||
log.debug(s"Trying to remove block block ${block.idTag}") | ||
|
||
val newBestBlockNumber: BigInt = if (bestBlockNumber >= 1) bestBlockNumber - 1 else 0 | ||
val txList = block.body.transactionList | ||
val bestBlockNumber = getBestBlockNumber() | ||
val latestCheckpointNumber = getLatestCheckpointBlockNumber() | ||
|
||
val blockNumberMappingUpdates = | ||
if (getHashByBlockNumber(block.number).contains(blockHash)) | ||
removeBlockNumberMapping(block.number) | ||
else blockNumberMappingStorage.emptyBatchUpdate | ||
|
||
val newBestBlockNumber: BigInt = (bestBlockNumber - 1).max(0) | ||
val newLatestCheckpointNumber: BigInt = | ||
if (block.hasCheckpoint && block.number == latestCheckpointNumber) { | ||
findPreviousCheckpointBlockNumber(block.number, block.number) | ||
} else latestCheckpointNumber | ||
|
||
/* | ||
This two below updates are an exception to the rule of only updating the best blocks when persisting the node | ||
cache. | ||
They are required in case we are removing a block that's marked on db as the best (or as the last checkpoint), | ||
to keep it's consistency, as it will no longer be the best block (nor the last checkpoint). | ||
This updates can't be done if the conditions are false as we might not have the associated mpt nodes, so falling | ||
into the case of having an incomplete best block and so an inconsistent db | ||
*/ | ||
val bestBlockNumberUpdates = | ||
if (appStateStorage.getBestBlockNumber() > newBestBlockNumber) | ||
appStateStorage.putBestBlockNumber(newBestBlockNumber) | ||
else appStateStorage.emptyBatchUpdate | ||
val latestCheckpointNumberUpdates = | ||
if (appStateStorage.getLatestCheckpointBlockNumber() > newLatestCheckpointNumber) | ||
appStateStorage.putLatestCheckpointBlockNumber(newLatestCheckpointNumber) | ||
else appStateStorage.emptyBatchUpdate | ||
|
||
log.debug( | ||
"Persisting app info data into database. Persisted block number is {}. Persisted checkpoint number is {}", | ||
newBestBlockNumber, | ||
newLatestCheckpointNumber | ||
) | ||
|
||
blockHeadersStorage | ||
.remove(blockHash) | ||
.and(blockBodiesStorage.remove(blockHash)) | ||
.and(chainWeightStorage.remove(blockHash)) | ||
.and(receiptStorage.remove(blockHash)) | ||
.and(maybeTxList.fold(transactionMappingStorage.emptyBatchUpdate)(removeTxsLocations)) | ||
.and(removeTxsLocations(txList)) | ||
.and(blockNumberMappingUpdates) | ||
.and(bestBlockNumberUpdates) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you are fixing best block number handling when rollbackin, do you think this ugly thing is still necessary in
We always call it before rollbacks happen (it is ugly hack from the past, as there was some troubles with cached block number when rollbacking earlier) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm it doesn't seem to be necessary and I'd gladly remove it... though I'm a bit unsure just in case I add further possibilities of introduced issues by changing that Though if you agree with removing it (or other reviewers do) I'll remove it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, if we can remove it now, let's do it! =) |
||
.and(latestCheckpointNumberUpdates) | ||
.commit() | ||
|
||
// not transactional part | ||
saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber) | ||
saveBestKnownBlocks(newBestBlockNumber, Some(newLatestCheckpointNumber)) | ||
log.debug( | ||
"Removed block with hash {}. New best block number - {}, new best checkpoint block number - {}", | ||
ByteStringUtils.hash2string(blockHash), | ||
newBestBlockNumber, | ||
prevCheckpointNumber | ||
newLatestCheckpointNumber | ||
) | ||
|
||
maybeBlockHeader.foreach { h => | ||
if (withState) { | ||
val bestBlocksUpdates = appStateStorage | ||
.putBestBlockNumber(newBestBlockNumber) | ||
.and(checkpointUpdates) | ||
stateStorage.onBlockRollback(h.number, bestBlockNumber) { () => | ||
log.debug( | ||
"Persisting block info data into database. Persisted block number is {}. " + | ||
"Persisted checkpoint number is {}", | ||
newBestBlockNumber, | ||
prevCheckpointNumber | ||
) | ||
bestBlocksUpdates.commit() | ||
} | ||
} | ||
} | ||
// not transactional part | ||
if (withState) | ||
stateStorage.onBlockRollback(block.number, bestBlockNumber) { () => persistBestBlocksData() } | ||
} | ||
// scalastyle:on method.length | ||
|
||
|
@@ -485,7 +479,7 @@ class BlockchainImpl( | |
private def findPreviousCheckpointBlockNumber( | ||
blockNumberToCheck: BigInt, | ||
latestCheckpointBlockNumber: BigInt | ||
): Option[BigInt] = { | ||
): BigInt = { | ||
if (blockNumberToCheck > 0) { | ||
val maybePreviousCheckpointBlockNumber = for { | ||
currentBlock <- getBlockByNumber(blockNumberToCheck) | ||
|
@@ -494,10 +488,10 @@ class BlockchainImpl( | |
} yield currentBlock.number | ||
|
||
maybePreviousCheckpointBlockNumber match { | ||
case Some(_) => maybePreviousCheckpointBlockNumber | ||
case Some(previousCheckpointBlockNumber) => previousCheckpointBlockNumber | ||
case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1, latestCheckpointBlockNumber) | ||
} | ||
} else None | ||
} else 0 | ||
} | ||
|
||
private def saveTxsLocations(blockHash: ByteString, blockBody: BlockBody): DataSourceBatchUpdate = | ||
|
@@ -549,13 +543,6 @@ class BlockchainImpl( | |
noEmptyAccounts = noEmptyAccounts, | ||
ethCompatibleStorage = ethCompatibleStorage | ||
) | ||
|
||
//FIXME EC-495 this method should not be need when best block is handled properly during rollback | ||
def persistCachedNodes(): Unit = { | ||
if (stateStorage.forcePersist(RollBackFlush)) { | ||
persistBestBlocksData() | ||
} | ||
} | ||
} | ||
|
||
trait BlockchainStorages { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: I am not sure why we need to update this block in storage when we update it when we update it here:
when cache is full or requires flushing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary for consistency's sake
But I wanted to follow the same approach than here, it's also best to keep this value as up-to-date as possible.
Though we have some conditions before to update this values, it could have not been the case if they were false