Skip to content

Commit f0befc1

Browse files
committed
[ETCM-76] network messages for checkpointing
1 parent ae398e1 commit f0befc1

37 files changed

+604
-121
lines changed

src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
158158
override val peerConfiguration: PeerConfiguration = peerConf
159159
override val blockchain: Blockchain = bl
160160
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
161+
override val blockchainConfig = CommonFakePeer.this.blockchainConfig // FIXME: remove in ETCM-280
161162
}
162163

163164
lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ object RegularSyncItSpecUtils {
7070
peerEventBus,
7171
ledger,
7272
bl,
73+
blockchainConfig, // FIXME: remove in ETCM-280
7374
testSyncConfig,
7475
ommersPool,
7576
pendingTransactionsManager,

src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
1919
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
2020
import io.iohk.ethereum.network.{ForkResolver, PeerEventBusActor, PeerManagerActor}
2121
import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, SecureRandomBuilder}
22-
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
22+
import io.iohk.ethereum.utils.{BlockchainConfig, Config, NodeStatus, ServerStatus}
2323
import java.util.concurrent.atomic.AtomicReference
2424

2525
import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
@@ -82,6 +82,7 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
8282
override val nodeStatusHolder: AtomicReference[NodeStatus] = DumpChainApp.nodeStatusHolder
8383
override val peerConfiguration: PeerConfiguration = peerConfig
8484
override val blockchain: Blockchain = DumpChainApp.blockchain
85+
override val blockchainConfig: BlockchainConfig = DumpChainApp.blockchainConfig
8586
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
8687
}
8788

src/main/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcast.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {
3535
}
3636

3737
private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =
38-
newBlock.block.header.number > peerInfo.maxBlockNumber || newBlock.totalDifficulty > peerInfo.totalDifficulty
38+
newBlock.block.header.number > peerInfo.maxBlockNumber ||
39+
newBlock.totalDifficulty > peerInfo.totalDifficulty ||
40+
newBlock.latestCheckpointNumber > peerInfo.latestCheckpointNumber
3941

4042
private def broadcastNewBlock(newBlock: NewBlock, peers: Set[Peer]): Unit =
4143
obtainRandomPeerSubset(peers).foreach { peer =>

src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,12 @@ object PeersClient {
161161

162162
def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
163163
val peersToUse = peersToDownloadFrom
164-
.collect { case (ref, PeerInfo(_, totalDifficulty, true, _, _)) =>
165-
(ref, totalDifficulty)
164+
.collect { case (ref, PeerInfo(_, totalDifficulty, latestChkp, true, _, _)) =>
165+
(ref, totalDifficulty, latestChkp)
166166
}
167167

168168
if (peersToUse.nonEmpty) {
169-
val (peer, _) = peersToUse.maxBy { case (_, td) => td }
169+
val (peer, _, _) = peersToUse.maxBy { case (_, td, latestChkp) => latestChkp -> td }
170170
Some(peer)
171171
} else {
172172
None

src/main/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelector.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ class PivotBlockSelector(
173173
}
174174

175175
private def collectVoters: ElectionDetails = {
176-
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
177-
(peer, maxBlockNumber)
176+
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
177+
case (peer, PeerInfo(_, _, _, true, maxBlockNumber, _)) =>
178+
(peer, maxBlockNumber)
178179
}
179180

180181
val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import io.iohk.ethereum.consensus.validators.Validators
77
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
88
import io.iohk.ethereum.domain.Blockchain
99
import io.iohk.ethereum.ledger.Ledger
10+
import io.iohk.ethereum.utils.BlockchainConfig
1011
import io.iohk.ethereum.utils.Config.SyncConfig
1112

1213
class SyncController(
1314
appStateStorage: AppStateStorage,
1415
blockchain: Blockchain,
16+
blockchainConfig: BlockchainConfig,
1517
fastSyncStateStorage: FastSyncStateStorage,
1618
ledger: Ledger,
1719
validators: Validators,
@@ -101,6 +103,7 @@ class SyncController(
101103
peerEventBus,
102104
ledger,
103105
blockchain,
106+
blockchainConfig,
104107
syncConfig,
105108
ommersPool,
106109
pendingTransactionsManager,
@@ -120,6 +123,7 @@ object SyncController {
120123
def props(
121124
appStateStorage: AppStateStorage,
122125
blockchain: Blockchain,
126+
blockchainConfig: BlockchainConfig,
123127
syncStateStorage: FastSyncStateStorage,
124128
ledger: Ledger,
125129
validators: Validators,
@@ -134,6 +138,7 @@ object SyncController {
134138
new SyncController(
135139
appStateStorage,
136140
blockchain,
141+
blockchainConfig,
137142
syncStateStorage,
138143
ledger,
139144
validators,

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ class BlockFetcher(
5353

5454
private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
5555
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
56-
peerEventBus ! Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers))
56+
peerEventBus ! Subscribe(
57+
MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers)
58+
)
5759
}
5860

5961
def handleCommonMessages(state: Option[BlockFetcherState]): Receive = { case PrintStatus =>
@@ -177,7 +179,7 @@ class BlockFetcher(
177179
}
178180

179181
fetchBlocks(newState)
180-
case MessageFromPeer(NewBlock(block, _), peerId) =>
182+
case MessageFromPeer(NewBlock(block, _, _), peerId) =>
181183
val newBlockNr = block.number
182184
val nextExpectedBlock = state.lastFullBlockNumber + 1
183185

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@ import io.iohk.ethereum.domain.{Block, Blockchain, Checkpoint, SignedTransaction
1414
import io.iohk.ethereum.ledger._
1515
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
1616
import io.iohk.ethereum.network.PeerId
17-
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
17+
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, NewBlock63, NewBlock64}
1818
import io.iohk.ethereum.ommers.OmmersPool.AddOmmers
1919
import io.iohk.ethereum.transactions.PendingTransactionsManager
2020
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddUncheckedTransactions, RemoveTransactions}
21-
import io.iohk.ethereum.utils.ByteStringUtils
21+
import io.iohk.ethereum.utils.{BlockchainConfig, ByteStringUtils}
2222
import io.iohk.ethereum.utils.Config.SyncConfig
2323
import io.iohk.ethereum.utils.FunctorOps._
2424

2525
import scala.concurrent.{ExecutionContext, Future}
2626
import scala.concurrent.duration._
2727
import scala.util.{Failure, Success}
2828

29-
// scalastyle:off cyclomatic.complexity
29+
// scalastyle:off cyclomatic.complexity parameter.number
3030
class BlockImporter(
3131
fetcher: ActorRef,
3232
ledger: Ledger,
3333
blockchain: Blockchain,
34+
blockchainConfig: BlockchainConfig, //FIXME: this should not be needed after ETCM-280
3435
syncConfig: SyncConfig,
3536
ommersPool: ActorRef,
3637
broadcaster: ActorRef,
@@ -248,8 +249,22 @@ class BlockImporter(
248249
}
249250
}
250251

251-
private def broadcastBlocks(blocks: List[Block], totalDifficulties: List[BigInt]): Unit = {
252-
val newBlocks = (blocks, totalDifficulties).mapN(NewBlock.apply)
252+
private def broadcastBlocks(
253+
blocks: List[Block],
254+
totalDifficulties: List[BigInt]
255+
): Unit = {
256+
val constructNewBlock = {
257+
//FIXME: instead of choosing the message version based on block we should rely on the receiving
258+
// peer's `Capability`. To be addressed in ETCM-280
259+
if (blocks.lastOption.exists(_.number < blockchainConfig.ecip1097BlockNumber))
260+
NewBlock63.apply _
261+
else
262+
//FIXME: we should use checkpoint number corresponding to the block we're broadcasting. This will be addressed
263+
// in ETCM-263 by using ChainWeight for that block
264+
NewBlock64.apply(_, _, blockchain.getLatestCheckpointBlockNumber())
265+
}
266+
267+
val newBlocks = (blocks, totalDifficulties).mapN(constructNewBlock)
253268
broadcastNewBlocks(newBlocks)
254269
}
255270

@@ -316,6 +331,7 @@ object BlockImporter {
316331
fetcher: ActorRef,
317332
ledger: Ledger,
318333
blockchain: Blockchain,
334+
blockchainConfig: BlockchainConfig,
319335
syncConfig: SyncConfig,
320336
ommersPool: ActorRef,
321337
broadcaster: ActorRef,
@@ -327,6 +343,7 @@ object BlockImporter {
327343
fetcher,
328344
ledger,
329345
blockchain,
346+
blockchainConfig,
330347
syncConfig,
331348
ommersPool,
332349
broadcaster,

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
77
import io.iohk.ethereum.crypto.ECDSASignature
88
import io.iohk.ethereum.domain.{Block, Blockchain}
99
import io.iohk.ethereum.ledger.Ledger
10-
import io.iohk.ethereum.utils.ByteStringUtils
10+
import io.iohk.ethereum.utils.{BlockchainConfig, ByteStringUtils}
1111
import io.iohk.ethereum.utils.Config.SyncConfig
1212

1313
class RegularSync(
@@ -16,6 +16,7 @@ class RegularSync(
1616
peerEventBus: ActorRef,
1717
ledger: Ledger,
1818
blockchain: Blockchain,
19+
blockchainConfig: BlockchainConfig,
1920
syncConfig: SyncConfig,
2021
ommersPool: ActorRef,
2122
pendingTransactionsManager: ActorRef,
@@ -38,6 +39,7 @@ class RegularSync(
3839
fetcher,
3940
ledger,
4041
blockchain,
42+
blockchainConfig,
4143
syncConfig,
4244
ommersPool,
4345
broadcaster,
@@ -91,6 +93,7 @@ object RegularSync {
9193
peerEventBus: ActorRef,
9294
ledger: Ledger,
9395
blockchain: Blockchain,
96+
blockchainConfig: BlockchainConfig,
9497
syncConfig: SyncConfig,
9598
ommersPool: ActorRef,
9699
pendingTransactionsManager: ActorRef,
@@ -104,6 +107,7 @@ object RegularSync {
104107
peerEventBus,
105108
ledger,
106109
blockchain,
110+
blockchainConfig,
107111
syncConfig,
108112
ommersPool,
109113
pendingTransactionsManager,

src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,11 @@ case object BlockEnqueued extends BlockImportResult
274274

275275
case object DuplicateBlock extends BlockImportResult
276276

277-
case class ChainReorganised(oldBranch: List[Block], newBranch: List[Block], totalDifficulties: List[BigInt])
278-
extends BlockImportResult
277+
case class ChainReorganised(
278+
oldBranch: List[Block],
279+
newBranch: List[Block],
280+
totalDifficulties: List[BigInt]
281+
) extends BlockImportResult
279282

280283
case class BlockImportFailed(error: String) extends BlockImportResult
281284

src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class EtcPeerManagerActor(
143143
* @return new updated peer info
144144
*/
145145
private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = {
146-
(updateTotalDifficulty(message) _
146+
(updateTotalDifficultyAndCheckpoint(message) _
147147
andThen updateForkAccepted(message, initialPeerWithInfo.peer)
148148
andThen updateMaxBlock(message))(initialPeerWithInfo.peerInfo)
149149
}
@@ -155,11 +155,15 @@ class EtcPeerManagerActor(
155155
* @param initialPeerInfo from before the message was processed
156156
* @return new peer info with the total difficulty updated
157157
*/
158-
private def updateTotalDifficulty(message: Message)(initialPeerInfo: PeerInfo): PeerInfo = message match {
159-
case newBlock: NewBlock =>
160-
initialPeerInfo.withTotalDifficulty(newBlock.totalDifficulty)
161-
case _ => initialPeerInfo
162-
}
158+
private def updateTotalDifficultyAndCheckpoint(message: Message)(initialPeerInfo: PeerInfo): PeerInfo =
159+
message match {
160+
case newBlock: NewBlock =>
161+
initialPeerInfo.copy(
162+
totalDifficulty = newBlock.totalDifficulty,
163+
latestCheckpointNumber = newBlock.latestCheckpointNumber
164+
)
165+
case _ => initialPeerInfo
166+
}
163167

164168
/**
165169
* Processes the message and updates if the fork block was accepted from the peer
@@ -228,11 +232,12 @@ class EtcPeerManagerActor(
228232

229233
object EtcPeerManagerActor {
230234

231-
val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code, NewBlockHashes.code)
235+
val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code63, NewBlock.code64, NewBlockHashes.code)
232236

233237
case class PeerInfo(
234238
remoteStatus: Status, // Updated only after handshaking
235239
totalDifficulty: BigInt,
240+
latestCheckpointNumber: BigInt,
236241
forkAccepted: Boolean,
237242
maxBlockNumber: BigInt,
238243
bestBlockHash: ByteString
@@ -257,7 +262,14 @@ object EtcPeerManagerActor {
257262

258263
object PeerInfo {
259264
def apply(remoteStatus: Status, forkAccepted: Boolean): PeerInfo = {
260-
PeerInfo(remoteStatus, remoteStatus.totalDifficulty, forkAccepted, 0, remoteStatus.bestHash)
265+
PeerInfo(
266+
remoteStatus,
267+
remoteStatus.totalDifficulty,
268+
remoteStatus.latestCheckpointNumber,
269+
forkAccepted,
270+
0,
271+
remoteStatus.bestHash
272+
)
261273
}
262274

263275
def withForkAccepted(remoteStatus: Status): PeerInfo = PeerInfo(remoteStatus, forkAccepted = true)

src/main/scala/io/iohk/ethereum/network/handshaker/EtcHandshaker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import io.iohk.ethereum.domain.Blockchain
55
import io.iohk.ethereum.network.ForkResolver
66
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
77
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
8-
import io.iohk.ethereum.utils.NodeStatus
8+
import io.iohk.ethereum.utils.{BlockchainConfig, NodeStatus}
99
import java.util.concurrent.atomic.AtomicReference
1010

1111
case class EtcHandshaker private (
@@ -31,6 +31,7 @@ object EtcHandshaker {
3131
trait EtcHandshakerConfiguration {
3232
val nodeStatusHolder: AtomicReference[NodeStatus]
3333
val blockchain: Blockchain
34+
val blockchainConfig: BlockchainConfig
3435
val appStateStorage: AppStateStorage
3536
val peerConfiguration: PeerConfiguration
3637
val forkResolverOpt: Option[ForkResolver]

src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,15 @@ case class EtcNodeStatusExchangeState(handshakerConfiguration: EtcHandshakerConf
5252
private def createStatusMsg(): Status = {
5353
val bestBlockHeader = getBestBlockHeader()
5454
val totalDifficulty = blockchain.getTotalDifficultyByHash(bestBlockHeader.hash).get
55+
val latestCheckpointNumber =
56+
if (bestBlockHeader.number < blockchainConfig.ecip1097BlockNumber) None
57+
else Some(blockchain.getLatestCheckpointBlockNumber())
58+
5559
val status = Status(
5660
protocolVersion = Versions.PV63,
5761
networkId = peerConfiguration.networkId,
5862
totalDifficulty = totalDifficulty,
63+
latestCheckpointNumber = latestCheckpointNumber,
5964
bestHash = bestBlockHeader.hash,
6065
genesisHash = blockchain.genesisHeader.hash
6166
)

0 commit comments

Comments
 (0)