Skip to content

Commit 4e48c2e

Browse files
author
Aurélien Richez
authored
[ETCM-1069] Create ConsensusAdapter(#1096)
* make adapter pre-validate block * enqueue blocks in ConsensusAdapter * remove blockQueue from Consensus
1 parent 51ed552 commit 4e48c2e

20 files changed

+463
-229
lines changed

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher
2626
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter
2727
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.NewCheckpoint
2828
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
29-
import io.iohk.ethereum.consensus.Consensus
3029
import io.iohk.ethereum.consensus.ConsensusAdapter
3130
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
3231
import io.iohk.ethereum.consensus.pow.validators.OmmersValidator

src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,23 +124,25 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
124124
"peers should chose the branch with a checkpoint discarding blocks that come after the checkpoint" in customTestCaseResourceM(
125125
FakePeer.start2FakePeersRes()
126126
) { case (peer1, peer2) =>
127-
val length = 26
127+
val checkpointBlockNumber = 26
128128
for {
129129
_ <- peer1.importBlocksUntil(20)(IdentityUpdate)
130130
_ <- peer2.importBlocksUntil(30)(IdentityUpdate)
131131
_ <- peer1.startRegularSync()
132132
_ <- peer2.startRegularSync()
133-
_ <- peer2.addCheckpointedBlock(
133+
_ <- peer2.addCheckpointedBlock( // checkpointing block 26
134134
peer2.blockchainReader.getBlockByNumber(peer2.blockchainReader.getBestBranch(), 25).get
135135
)
136-
_ <- peer2.waitForRegularSyncLoadLastBlock(length)
136+
_ <- peer2.waitForRegularSyncLoadLastBlock(checkpointBlockNumber)
137137
_ <- peer1.connectToPeers(Set(peer2.node))
138-
_ <- peer1.waitForRegularSyncLoadLastBlock(length)
138+
_ <- peer1.waitForRegularSyncLoadLastBlock(checkpointBlockNumber)
139139
} yield {
140140
assert(peer1.blockchainReader.getBestBlock().get.hash == peer2.blockchainReader.getBestBlock().get.hash)
141141
val peer1BestBlockNumber = peer1.blockchainReader.getBestBlock().get.number
142142
val peer2BestBlockNumber = peer2.blockchainReader.getBestBlock().get.number
143-
assert(peer1BestBlockNumber == peer2BestBlockNumber && peer1BestBlockNumber == length)
143+
144+
assert(peer1BestBlockNumber == peer2BestBlockNumber && peer1BestBlockNumber == checkpointBlockNumber)
145+
assert(peer1.blockchainReader.getLatestCheckpointBlockNumber() == checkpointBlockNumber)
144146
assert(
145147
peer1.blockchainReader.getLatestCheckpointBlockNumber() == peer2.blockchainReader
146148
.getLatestCheckpointBlockNumber()

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ object RegularSyncItSpecUtils {
111111
bl,
112112
blockchainReader,
113113
blockchainWriter,
114-
blockQueue,
115114
blockExecution
116115
)
117116
lazy val consensusAdapter = new ConsensusAdapter(

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import akka.actor.Scheduler
99

1010
import io.iohk.ethereum.blockchain.sync.fast.FastSync
1111
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
12-
import io.iohk.ethereum.consensus.Consensus
1312
import io.iohk.ethereum.consensus.ConsensusAdapter
1413
import io.iohk.ethereum.consensus.validators.Validators
1514
import io.iohk.ethereum.db.storage.AppStateStorage

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
2020
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
2121
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlocks
2222
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
23-
import io.iohk.ethereum.consensus.Consensus
2423
import io.iohk.ethereum.consensus.ConsensusAdapter
2524
import io.iohk.ethereum.crypto.kec256
2625
import io.iohk.ethereum.db.storage.StateStorage

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockIm
1919
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
2020
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
2121
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressState
22-
import io.iohk.ethereum.consensus.Consensus
2322
import io.iohk.ethereum.consensus.ConsensusAdapter
2423
import io.iohk.ethereum.consensus.validators.BlockValidator
2524
import io.iohk.ethereum.db.storage.StateStorage

src/main/scala/io/iohk/ethereum/consensus/Consensus.scala

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
package io.iohk.ethereum.consensus
22

3+
import akka.util.ByteString
4+
5+
import cats.data.NonEmptyList
6+
37
import monix.eval.Task
48
import monix.execution.Scheduler
59

6-
import io.iohk.ethereum.blockchain.sync.regular.BlockImportResult
10+
import io.iohk.ethereum.consensus.Consensus.ConsensusResult
711
import io.iohk.ethereum.domain.Block
12+
import io.iohk.ethereum.domain.ChainWeight
13+
import io.iohk.ethereum.ledger.BlockData
14+
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
815
import io.iohk.ethereum.utils.BlockchainConfig
916

1017
/** This file documents the original interface that was designed at ETCM-1018
@@ -13,8 +20,8 @@ import io.iohk.ethereum.utils.BlockchainConfig
1320
*/
1421
trait Consensus {
1522
def evaluateBranch(
16-
block: Seq[Block]
17-
)(implicit blockExecutionScheduler: Scheduler, blockchainConfig: BlockchainConfig): Task[BlockImportResult]
23+
block: NonEmptyList[Block]
24+
)(implicit blockExecutionScheduler: Scheduler, blockchainConfig: BlockchainConfig): Task[ConsensusResult]
1825

1926
/** Original interface from ETCM-1018, for temporary documentation purposes
2027
*/
@@ -67,3 +74,44 @@ trait Consensus {
6774
// private def attemptToSetNewBestBranch(branch: UpdatedBranch): Either[BlockExecutionError, Boolean] = ???
6875

6976
}
77+
78+
object Consensus {
79+
/* This return type for consensus is probably overcomplicated for now because some information is needed
80+
* to keep the compatibility with the current code (particularly for the block queue handling), and be able
81+
* to translate the values to BlockImportResult.
82+
* In particular:
83+
* - `blockToEnqueue` fields won't be needed if the block are already stored in memory
84+
* - The distinction between ExtendedCurrentBestBranch and SelectedNewBestBranch won't really be useful
85+
* because there will be no need to put back the old branch into the block queue in case of reorganisation
86+
* - `ConsensusErrorDueToMissingNode` and `ConsensusError` would mean that the application is in an
87+
* inconsistent state. Unless there is a reason to think that mantis would self heal when that happens, I
88+
* don't think there is a reason to add them here.
89+
*/
90+
91+
sealed trait ConsensusResult
92+
93+
/** The new branch was selected and it extended the best branch. */
94+
case class ExtendedCurrentBestBranch(blockImportData: List[BlockData]) extends ConsensusResult
95+
96+
/** The new branch was selected and it extended the best branch, but it did not execute completely. */
97+
case class ExtendedCurrentBestBranchPartially(blockImportData: List[BlockData], failureBranch: BranchExecutionFailure)
98+
extends ConsensusResult
99+
100+
/** The new branch was selected but was not an extension of the best branch. */
101+
case class SelectedNewBestBranch(oldBranch: List[Block], newBranch: List[Block], weights: List[ChainWeight])
102+
extends ConsensusResult
103+
104+
/** The proposed new branch was not better than the current best one. */
105+
case object KeptCurrentBestBranch extends ConsensusResult
106+
107+
/** A block in the branch cannot be executed. */
108+
case class BranchExecutionFailure(blockToEnqueue: List[Block], failingBlockHash: ByteString, error: String)
109+
extends ConsensusResult
110+
111+
/** An error external the the blocks in the branch occured, which prevents the branch from being executed.
112+
* Usually this is due to an inconsistency in the database.
113+
*/
114+
case class ConsensusError(blockToEnqueue: List[Block], err: String) extends ConsensusResult
115+
case class ConsensusErrorDueToMissingNode(blockToEnqueue: List[Block], reason: MissingNodeException)
116+
extends ConsensusResult
117+
}

src/main/scala/io/iohk/ethereum/consensus/ConsensusAdapter.scala

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,24 @@
11
package io.iohk.ethereum.consensus
22

3+
import cats.data.NonEmptyList
4+
35
import monix.eval.Task
46
import monix.execution.Scheduler
57

8+
import io.iohk.ethereum.blockchain.sync.regular.BlockEnqueued
69
import io.iohk.ethereum.blockchain.sync.regular.BlockImportFailed
10+
import io.iohk.ethereum.blockchain.sync.regular.BlockImportFailedDueToMissingNode
711
import io.iohk.ethereum.blockchain.sync.regular.BlockImportResult
12+
import io.iohk.ethereum.blockchain.sync.regular.BlockImportedToTop
13+
import io.iohk.ethereum.blockchain.sync.regular.ChainReorganised
814
import io.iohk.ethereum.blockchain.sync.regular.DuplicateBlock
15+
import io.iohk.ethereum.consensus.Consensus.BranchExecutionFailure
16+
import io.iohk.ethereum.consensus.Consensus.ConsensusError
17+
import io.iohk.ethereum.consensus.Consensus.ConsensusErrorDueToMissingNode
18+
import io.iohk.ethereum.consensus.Consensus.ExtendedCurrentBestBranch
19+
import io.iohk.ethereum.consensus.Consensus.ExtendedCurrentBestBranchPartially
20+
import io.iohk.ethereum.consensus.Consensus.KeptCurrentBestBranch
21+
import io.iohk.ethereum.consensus.Consensus.SelectedNewBestBranch
922
import io.iohk.ethereum.domain.Block
1023
import io.iohk.ethereum.domain.BlockHeader
1124
import io.iohk.ethereum.domain.BlockchainReader
@@ -36,19 +49,59 @@ class ConsensusAdapter(
3649
if (isBlockADuplicate(block.header, bestBlock.header.number)) {
3750
log.debug("Ignoring duplicated block: {}", block.idTag)
3851
Task.now(DuplicateBlock)
52+
} else if (blockchainReader.getChainWeightByHash(bestBlock.header.hash).isEmpty) {
53+
// This part is not really needed except for compatibility as a missing chain weight
54+
// would indicate an inconsistent database
55+
returnNoTotalDifficulty(bestBlock)
3956
} else {
4057
doBlockPreValidation(block).flatMap {
4158
case Left(error) =>
4259
Task.now(BlockImportFailed(error.reason.toString))
4360
case Right(BlockExecutionSuccess) =>
44-
consensus.evaluateBranch(Seq(block))
61+
enqueueAndGetBranch(block, bestBlock.number)
62+
.map(forwardAndTranslateConsensusResult) // a new branch was created so we give it to consensus
63+
.getOrElse(Task.now(BlockEnqueued)) // the block was not rooted so it was simply enqueued
4564
}
4665
}
4766
case None =>
4867
log.error("Couldn't find the current best block")
4968
Task.now(BlockImportFailed("Couldn't find the current best block"))
5069
}
5170

71+
private def forwardAndTranslateConsensusResult(
72+
newBranch: NonEmptyList[Block]
73+
)(implicit blockExecutionScheduler: Scheduler, blockchainConfig: BlockchainConfig) =
74+
consensus
75+
.evaluateBranch(newBranch)
76+
.map {
77+
case SelectedNewBestBranch(oldBranch, newBranch, weights) =>
78+
oldBranch.foreach(blockQueue.enqueueBlock(_))
79+
ChainReorganised(oldBranch, newBranch, weights)
80+
case ExtendedCurrentBestBranch(blockImportData) =>
81+
BlockImportedToTop(blockImportData)
82+
case ExtendedCurrentBestBranchPartially(
83+
blockImportData,
84+
BranchExecutionFailure(blocksToEnqueue, failingBlockHash, error)
85+
) =>
86+
blocksToEnqueue.foreach(blockQueue.enqueueBlock(_))
87+
blockQueue.removeSubtree(failingBlockHash)
88+
log.warn("extended best branch partially because of error: {}", error)
89+
BlockImportedToTop(blockImportData)
90+
case KeptCurrentBestBranch =>
91+
newBranch.toList.foreach(blockQueue.enqueueBlock(_))
92+
BlockEnqueued
93+
case BranchExecutionFailure(blocksToEnqueue, failingBlockHash, error) =>
94+
blocksToEnqueue.foreach(blockQueue.enqueueBlock(_))
95+
blockQueue.removeSubtree(failingBlockHash)
96+
BlockImportFailed(error)
97+
case ConsensusError(blocksToEnqueue, error) =>
98+
blocksToEnqueue.foreach(blockQueue.enqueueBlock(_))
99+
BlockImportFailed(error)
100+
case ConsensusErrorDueToMissingNode(blocksToEnqueue, reason) =>
101+
blocksToEnqueue.foreach(blockQueue.enqueueBlock(_))
102+
BlockImportFailedDueToMissingNode(reason)
103+
}
104+
52105
private def doBlockPreValidation(block: Block)(implicit
53106
blockchainConfig: BlockchainConfig
54107
): Task[Either[ValidationBeforeExecError, BlockExecutionSuccess]] =
@@ -71,4 +124,22 @@ class ConsensusAdapter(
71124
block.number <= currentBestBlockNumber ||
72125
blockQueue.isQueued(hash)
73126
}
127+
128+
private def enqueueAndGetBranch(block: Block, bestBlockNumber: BigInt): Option[NonEmptyList[Block]] =
129+
blockQueue
130+
.enqueueBlock(block, bestBlockNumber)
131+
.map(topBlock => blockQueue.getBranch(topBlock.hash, dequeue = true))
132+
.flatMap(NonEmptyList.fromList)
133+
134+
private def returnNoTotalDifficulty(bestBlock: Block): Task[BlockImportFailed] = {
135+
log.error(
136+
"Getting total difficulty for current best block with hash: {} failed",
137+
bestBlock.header.hashAsHexString
138+
)
139+
Task.now(
140+
BlockImportFailed(
141+
s"Couldn't get total difficulty for current best block with hash: ${bestBlock.header.hashAsHexString}"
142+
)
143+
)
144+
}
74145
}

0 commit comments

Comments
 (0)