Skip to content

[ETCM-75] Support for checkpoint blocks in Blockchain #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,13 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit

def saveBlockNumber(number: BigInt, hash: NodeHash): Unit = ???

def saveBestKnownBlock(number: BigInt): Unit = ???
def saveBestKnownBlocks(bestBlockNumber: BigInt, latestCheckpointNumber: Option[BigInt] = None): Unit = ???

def getBestBlock(): Block = ???

override def save(block: Block, receipts: Seq[Receipt], totalDifficulty: BigInt, saveAsBestBlock: Boolean): Unit = ???

override def getStateStorage: StateStorage = ???

override def getLatestCheckpointBlockNumber(): BigInt = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class FastSync(
blockchain.removeBlock(headerToRemove.hash, withState = false)
}
}
// TODO (maybe ETCM-77): Manage last checkpoint number too
appStateStorage.putBestBlockNumber((startBlock - blocksToDiscard - 1) max 0).commit()
}

Expand Down
26 changes: 13 additions & 13 deletions src/main/scala/io/iohk/ethereum/db/storage/StateStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ trait StateStorage {
def getBackingStorage(bn: BigInt): MptStorage
def getReadOnlyStorage: MptStorage

def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit
def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit
def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit
def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit

def saveNode(nodeHash: NodeHash, nodeEncoded: NodeEncoded, bn: BigInt)
def getNode(nodeHash: NodeHash): Option[MptNode]
Expand All @@ -34,15 +34,15 @@ class ArchiveStateStorage(private val nodeStorage: NodeStorage,
true
}

override def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit = {
override def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit = {
if (cachedNodeStorage.persist()) {
updateBestBlock(None)
updateBestBlocksData()
}
}

override def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit = {
override def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit = {
if (cachedNodeStorage.persist()) {
updateBestBlock(None)
updateBestBlocksData()
}
}

Expand Down Expand Up @@ -71,21 +71,21 @@ class ReferenceCountedStateStorage(private val nodeStorage: NodeStorage,
true
}

override def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit = {
override def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit = {
val blockToPrune = bn - pruningHistory

ReferenceCountNodeStorage.prune(blockToPrune, cachedNodeStorage, inMemory = blockToPrune > currentBestSavedBlock)

if (cachedNodeStorage.persist()) {
updateBestBlock(None)
updateBestBlocksData()
}
}

override def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit = {
override def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit = {
ReferenceCountNodeStorage.rollback(bn, cachedNodeStorage, inMemory = bn > currentBestSavedBlock)

if (cachedNodeStorage.persist()) {
updateBestBlock(None)
updateBestBlocksData()
}
}

Expand Down Expand Up @@ -120,19 +120,19 @@ class CachedReferenceCountedStateStorage(private val nodeStorage: NodeStorage,
}
}

override def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit = {
override def onBlockSave(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit = {
val blockToPrune = bn - pruningHistory
changeLog.persistChangeLog(bn)
changeLog.getDeathRowFromStorage(blockToPrune).foreach {deathRow =>
CachedReferenceCountedStorage.prune(deathRow, lruCache, blockToPrune)
}
if (CachedReferenceCountedStorage.persistCache(lruCache, nodeStorage)) {
updateBestBlock(None)
updateBestBlocksData()
}
changeLog.removeBlockMetaData(blockToPrune)
}

override def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlock: Option[BigInt] => Unit): Unit = {
override def onBlockRollback(bn: BigInt, currentBestSavedBlock: BigInt)(updateBestBlocksData: () => Unit): Unit = {
changeLog.getChangeLogFromStorage(bn).foreach { changeLog =>
CachedReferenceCountedStorage.rollback(lruCache, nodeStorage, changeLog, bn)
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/domain/Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ case class Block(header: BlockHeader, body: BlockBody) {

def hash: ByteString = header.hash

val hasCheckpoint: Boolean = header.hasCheckpoint

def isParentOf(child: Block): Boolean = number + 1 == child.number && child.header.parentHash == hash
}

Expand Down
119 changes: 104 additions & 15 deletions src/main/scala/io/iohk/ethereum/domain/Blockchain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
import io.iohk.ethereum.db.storage._
import io.iohk.ethereum.db.storage.pruning.PruningMode
import io.iohk.ethereum.domain
import io.iohk.ethereum.domain.BlockchainImpl.BestBlockLatestCheckpointNumbers
import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxyStorage}
import io.iohk.ethereum.mpt.{MerklePatriciaTrie, MptNode}
import io.iohk.ethereum.vm.{Storage, WorldStateProxy}

import scala.annotation.tailrec

/**
* Entity to be used to persist and query Blockchain related objects (blocks, transactions, ommers)
*/
Expand Down Expand Up @@ -123,6 +126,7 @@ trait Blockchain {

def getBestBlock(): Block

def getLatestCheckpointBlockNumber(): BigInt

/**
* Persists full block along with receipts and total difficulty
Expand Down Expand Up @@ -158,7 +162,7 @@ trait Blockchain {

def storeTotalDifficulty(blockhash: ByteString, totalDifficulty: BigInt): DataSourceBatchUpdate

def saveBestKnownBlock(number: BigInt): Unit
def saveBestKnownBlocks(bestBlockNumber: BigInt, latestCheckpointNumber: Option[BigInt] = None): Unit

def saveNode(nodeHash: NodeHash, nodeEncoded: NodeEncoded, blockNumber: BigInt): Unit

Expand Down Expand Up @@ -209,7 +213,8 @@ class BlockchainImpl(

// There is always only one writer thread (ensured by actor), but can by many readers (api calls)
// to ensure visibility of writes, needs to be volatile or atomic ref
private val bestKnownBlock: AtomicReference[BigInt] = new AtomicReference(BigInt(0))
private val bestKnownBlockAndLatestCheckpoint: AtomicReference[BestBlockLatestCheckpointNumbers] =
new AtomicReference(BestBlockLatestCheckpointNumbers(BigInt(0), BigInt(0)))

override def getBlockHeaderByHash(hash: ByteString): Option[BlockHeader] =
blockHeadersStorage.get(hash)
Expand All @@ -225,12 +230,22 @@ class BlockchainImpl(

override def getBestBlockNumber(): BigInt = {
val bestBlockNum = appStateStorage.getBestBlockNumber()
if (bestKnownBlock.get() > bestBlockNum)
bestKnownBlock.get()
if (bestKnownBlockAndLatestCheckpoint.get().bestBlockNumber > bestBlockNum)
bestKnownBlockAndLatestCheckpoint.get().bestBlockNumber
else
bestBlockNum
}

override def getLatestCheckpointBlockNumber(): BigInt = {
val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
// The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
// The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
Copy link
Contributor

@rtkaczyk rtkaczyk Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain in what situation these numbers can be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure every border case here, but as far as I understand the best block number is firstly saved in memory and then persisted to the storage only when it's time to persist cache. I want to save the best block number and latest checkpoint transactionally so getLatestCheckpointBlockNumber behaves the same. We can have the latest checkpoint number in memory bigger than the number in storage because the cache wasn't persisted yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, maybe add a comment about it?

bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
else
latestCheckpointNumberInStorage
}

override def getBestBlock(): Block =
getBlockByNumber(getBestBlockNumber()).get

Expand All @@ -252,8 +267,10 @@ class BlockchainImpl(
ByteString(mpt.get(position).getOrElse(BigInt(0)).toByteArray)
}

def saveBestBlock(bestBlock: Option[BigInt]): Unit = {
bestBlock.fold(appStateStorage.putBestBlockNumber(getBestBlockNumber()).commit())(best => appStateStorage.putBestBlockNumber(best).commit())
private def persistBestBlocksData(): Unit = {
appStateStorage.putBestBlockNumber(getBestBlockNumber())
.and(appStateStorage.putLatestCheckpointBlockNumber(getLatestCheckpointBlockNumber()))
.commit()
}

def save(block: Block, receipts: Seq[Receipt], totalDifficulty: BigInt, saveAsBestBlock: Boolean): Unit = {
Expand All @@ -263,8 +280,12 @@ class BlockchainImpl(
.commit()

// not transactional part
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(saveBestBlock)
if (saveAsBestBlock) {
// the best blocks data will be persisted only when the cache will be persisted
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)

if (saveAsBestBlock && block.hasCheckpoint) {
saveBestKnownBlockAndLatestCheckpointNumber(block.header.number, block.header.number)
} else if (saveAsBestBlock) {
saveBestKnownBlock(block.header.number)
}
}
Expand All @@ -289,8 +310,21 @@ class BlockchainImpl(
override def storeEvmCode(hash: ByteString, evmCode: ByteString): DataSourceBatchUpdate =
evmCodeStorage.put(hash, evmCode)

override def saveBestKnownBlock(number: BigInt): Unit = {
bestKnownBlock.set(number)
override def saveBestKnownBlocks(bestBlockNumber: BigInt, latestCheckpointNumber: Option[BigInt] = None): Unit = {
latestCheckpointNumber match {
case Some(number) =>
saveBestKnownBlockAndLatestCheckpointNumber(bestBlockNumber, number)
case None =>
saveBestKnownBlock(bestBlockNumber)
}
}

private def saveBestKnownBlock(bestBlockNumber: BigInt): Unit = {
bestKnownBlockAndLatestCheckpoint.updateAndGet(_.copy(bestBlockNumber = bestBlockNumber))
}

private def saveBestKnownBlockAndLatestCheckpointNumber(number: BigInt, latestCheckpointNumber: BigInt): Unit = {
bestKnownBlockAndLatestCheckpoint.set(BestBlockLatestCheckpointNumbers(number, latestCheckpointNumber))
}

def storeTotalDifficulty(blockhash: ByteString, td: BigInt): DataSourceBatchUpdate =
Expand All @@ -310,10 +344,18 @@ class BlockchainImpl(
blockNumberMappingStorage.remove(number)
}

// scalastyle:off method.length
override def removeBlock(blockHash: ByteString, withState: Boolean): Unit = {
val maybeBlockHeader = getBlockHeaderByHash(blockHash)
val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
val bestSavedBlock = getBestBlockNumber()
val bestBlocks = bestKnownBlockAndLatestCheckpoint.get()
// as we are decreasing block numbers in memory more often than in storage,
// we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber
val bestBlockNumber = if(bestBlocks.bestBlockNumber != 0) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber()
val latestCheckpointNumber = {
if(bestBlocks.latestCheckpointNumber != 0) bestBlocks.latestCheckpointNumber
else appStateStorage.getLatestCheckpointBlockNumber()
}

val blockNumberMappingUpdates = {
maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)( h =>
Expand All @@ -323,6 +365,22 @@ class BlockchainImpl(
)
}

val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate, Option[BigInt]) = maybeBlockHeader match {
case Some(header) =>
if (header.hasCheckpoint && header.number == latestCheckpointNumber) {
val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
prev.map { num =>
(appStateStorage.putLatestCheckpointBlockNumber(num), Some(num))
}.getOrElse {
(appStateStorage.removeLatestCheckpointBlockNumber(), Some(0))
}
} else (appStateStorage.emptyBatchUpdate, None)
case None =>
(appStateStorage.emptyBatchUpdate, None)
}

val newBestBlockNumber: BigInt = if(bestBlockNumber >= 1) bestBlockNumber - 1 else 0

blockHeadersStorage.remove(blockHash)
.and(blockBodiesStorage.remove(blockHash))
.and(totalDifficultyStorage.remove(blockHash))
Expand All @@ -332,11 +390,40 @@ class BlockchainImpl(
.commit()

// not transactional part
saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add this to our save function as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in save is different logic which controls what blocks data will be saved, so it's better to use low-level methods there

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And much more error prone... I think we'll suffer from this eventually 😕


maybeBlockHeader.foreach { h =>
if (withState)
stateStorage.onBlockRollback(h.number, bestSavedBlock)(saveBestBlock)
if (withState) {
val bestBlocksUpdates = appStateStorage.putBestBlockNumber(newBestBlockNumber)
.and(checkpointUpdates)
stateStorage.onBlockRollback(h.number, bestBlockNumber)(() => bestBlocksUpdates.commit())
}
}
}
// scalastyle:on method.length

/**
* Recursive function which try to find the previous checkpoint by traversing blocks from top to the bottom.
* In case of finding the checkpoint block number, the function will finish the job and return result
*/
@tailrec
private def findPreviousCheckpointBlockNumber(
blockNumberToCheck: BigInt,
latestCheckpointBlockNumber: BigInt
): Option[BigInt] = {
if (blockNumberToCheck > 0) {
val maybePreviousCheckpointBlockNumber = for {
currentBlock <- getBlockByNumber(blockNumberToCheck)
if currentBlock.hasCheckpoint &&
currentBlock.number < latestCheckpointBlockNumber
} yield currentBlock.number

maybePreviousCheckpointBlockNumber match {
case Some(_) => maybePreviousCheckpointBlockNumber
case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1, latestCheckpointBlockNumber)
}
} else None
}

private def saveTxsLocations(blockHash: ByteString, blockBody: BlockBody): DataSourceBatchUpdate =
blockBody.transactionList.zipWithIndex.foldLeft(transactionMappingStorage.emptyBatchUpdate) {
Expand Down Expand Up @@ -386,8 +473,8 @@ class BlockchainImpl(

//FIXME EC-495 this method should not be need when best block is handled properly during rollback
def persistCachedNodes(): Unit = {
if (stateStorage.forcePersist(RollBackFlush)){
appStateStorage.putBestBlockNumber(getBestBlockNumber()).commit()
if (stateStorage.forcePersist(RollBackFlush)) {
persistBestBlocksData()
}
}
}
Expand Down Expand Up @@ -423,4 +510,6 @@ object BlockchainImpl {
appStateStorage = storages.appStateStorage,
stateStorage = storages.stateStorage
)

private case class BestBlockLatestCheckpointNumbers(bestBlockNumber: BigInt, latestCheckpointNumber: BigInt)
}
1 change: 0 additions & 1 deletion src/main/scala/io/iohk/ethereum/jsonrpc/TestService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class TestService(
(blockchain.getBestBlockNumber() until request.blockNum by -1).foreach { n =>
blockchain.removeBlock(blockchain.getBlockHeaderByNumber(n).get.hash, withState = false)
}
blockchain.saveBestKnownBlock(request.blockNum)
Future.successful(Right(RewindToBlockResponse()))
}

Expand Down
12 changes: 8 additions & 4 deletions src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BlockImport(
validationResult <- validationResult
importResult <- importResult
} yield {
validationResult.fold(error => handleImportTopValidationError(error, block, currentBestBlock, importResult), _ => importResult)
validationResult.fold(error => handleImportTopValidationError(error, block, importResult), _ => importResult)
}
}

Expand Down Expand Up @@ -82,7 +82,6 @@ class BlockImport(
private def handleImportTopValidationError(
error: ValidationBeforeExecError,
block: Block,
bestBlockBeforeImport: Block,
blockImportResult: BlockImportResult
): BlockImportResult = {
blockImportResult match {
Expand All @@ -92,7 +91,6 @@ class BlockImport(
blockQueue.removeSubtree(hash)
blockchain.removeBlock(hash, withState = true)
}
blockchain.saveBestKnownBlock(bestBlockBeforeImport.header.number)
case _ => ()
}
handleBlockValidationError(error, block)
Expand Down Expand Up @@ -197,8 +195,14 @@ class BlockImport(
blockchain.save(block, receipts, td, saveAsBestBlock = false)
}

import cats.implicits._
val checkpointNumber = oldBranch
.collect {
case BlockData(block, _, _) if block.hasCheckpoint => block.number
}.maximumOption

val bestNumber = oldBranch.last.block.header.number
blockchain.saveBestKnownBlock(bestNumber)
blockchain.saveBestKnownBlocks(bestNumber, checkpointNumber)
executedBlocks.foreach(data => blockQueue.enqueueBlock(data.block, bestNumber))

newBranch.diff(executedBlocks.map(_.block)).headOption.foreach { block =>
Expand Down
Loading