Skip to content

Commit 6a31724

Browse files
author
Michal Mrozek
committed
[ECTM-104] Fix SyncControllerSpec
1 parent 8a0d99f commit 6a31724

File tree

5 files changed

+239
-235
lines changed

5 files changed

+239
-235
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class FastSync(
6969
val syncingHandler = new SyncingHandler(syncState)
7070
val targetBlockSelector = context.actorOf(
7171
FastSyncPivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler),
72-
"target-block-selector"
72+
"pivot-block-selector"
7373
)
7474
targetBlockSelector ! FastSyncPivotBlockSelector.ChoosePivotBlock
7575
context become syncingHandler.waitingForTargetBlockUpdate(ImportedLastBlock)
@@ -87,24 +87,24 @@ class FastSync(
8787
def startFromScratch(): Unit = {
8888
val targetBlockSelector = context.actorOf(
8989
FastSyncPivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler),
90-
"target-block-selector"
90+
"pivot-block-selector"
9191
)
9292
targetBlockSelector ! FastSyncPivotBlockSelector.ChoosePivotBlock
9393
context become waitingForTargetBlock
9494
}
9595

9696
def waitingForTargetBlock: Receive = handleCommonMessages orElse {
97-
case FastSyncPivotBlockSelector.Result(targetBlockHeader) =>
98-
if (targetBlockHeader.number < 1) {
97+
case FastSyncPivotBlockSelector.Result(pivotBlockHeader) =>
98+
if (pivotBlockHeader.number < 1) {
9999
log.info("Unable to start block synchronization in fast mode: target block is less than 1")
100100
appStateStorage.fastSyncDone().commit()
101101
context become idle
102102
syncController ! Done
103103
} else {
104104
val initialSyncState =
105105
SyncState(
106-
targetBlockHeader,
107-
safeDownloadTarget = targetBlockHeader.number + syncConfig.fastSyncBlockValidationX
106+
pivotBlockHeader,
107+
safeDownloadTarget = pivotBlockHeader.number + syncConfig.fastSyncBlockValidationX
108108
)
109109
startWithState(initialSyncState)
110110
}
Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package io.iohk.ethereum.blockchain.sync
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
4+
import akka.util.ByteString
45
import io.iohk.ethereum.domain.BlockHeader
5-
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
66
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
77
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
8-
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
98
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
9+
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
1010
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
11+
import io.iohk.ethereum.network.{EtcPeerManagerActor, PeerId}
1112
import io.iohk.ethereum.utils.Config.SyncConfig
12-
import scala.concurrent.duration.FiniteDuration
1313
import scala.concurrent.ExecutionContext.Implicits.global
14+
import scala.concurrent.duration.FiniteDuration
1415

1516
class FastSyncPivotBlockSelector(
1617
val etcPeerManager: ActorRef,
@@ -36,79 +37,88 @@ class FastSyncPivotBlockSelector(
3637
(peer, maxBlockNumber)
3738
}
3839

39-
if (peersUsedToChooseTarget.size >= minPeersToChoosePivotBlock) {
40-
tryChooseTargetBlock(peersUsedToChooseTarget)
40+
val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }
41+
val bestPeerBestBlockNumber = peersSortedByBestNumber.headOption
42+
.map { case (_, bestPeerBestBlockNumber) => bestPeerBestBlockNumber }
43+
.getOrElse(BigInt(0))
44+
val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0)
45+
val peersToAsk = peersSortedByBestNumber
46+
.takeWhile { case (_, number) => number >= expectedPivotBlock }
47+
.map { case (peer, _) => peer }
48+
49+
if (peersToAsk.size >= minPeersToChoosePivotBlock) {
50+
log.info(
51+
"Trying to choose fast sync pivot block using {} peers. The best block is {}. Ask {} peers for block nr {}",
52+
peersUsedToChooseTarget.size,
53+
bestPeerBestBlockNumber,
54+
peersToAsk.size,
55+
expectedPivotBlock
56+
)
57+
58+
peersToAsk.foreach { peer =>
59+
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
60+
etcPeerManager ! EtcPeerManagerActor.SendMessage(
61+
GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false),
62+
peer.id
63+
)
64+
}
65+
66+
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, PivotBlockTimeout)
67+
context become waitingForPivotBlock(peersToAsk.map(_.id).toSet, expectedPivotBlock, timeout, Map.empty)
4168
} else {
4269
log.info(
43-
"Cannot pick pivot block. Need at least {} peers, but there are only {} available at the moment. Retrying in {}",
70+
"Cannot pick pivot block. Need at least {} peers, but there are only {} which meet the criteria ({} all available at the moment). Retrying in {}",
4471
minPeersToChoosePivotBlock,
72+
peersToAsk.size,
4573
peersUsedToChooseTarget.size,
4674
startRetryInterval
4775
)
4876
scheduleRetry(startRetryInterval)
49-
context become idle
5077
}
5178
}
5279

53-
def tryChooseTargetBlock(peersWithBestBlockNumbers: Map[Peer, BigInt]): Unit = {
54-
val peersSortedByBestNumber = peersWithBestBlockNumbers.toList.sortBy(-_._2)
55-
val bestPeerBestBlockNumber = peersSortedByBestNumber.head._2
56-
val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0)
57-
val peersToAsk = peersSortedByBestNumber.takeWhile(_._2 >= expectedPivotBlock).map(_._1)
58-
59-
log.info(
60-
"Trying to choose fast sync pivot block using {} peers. The best block is {}. Ask {} peers for block nr {}",
61-
peersWithBestBlockNumbers.size,
62-
bestPeerBestBlockNumber,
63-
peersToAsk.size,
64-
expectedPivotBlock
65-
)
66-
67-
peersToAsk.foreach { peer =>
68-
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
69-
etcPeerManager ! EtcPeerManagerActor.SendMessage(
70-
GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false),
71-
peer.id
72-
)
73-
}
74-
75-
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, PivotBlockTimeout)
76-
context become waitingForPivotBlock(peersToAsk.map(_.id).toSet, expectedPivotBlock, timeout, Map.empty)
77-
}
78-
7980
def waitingForPivotBlock(
8081
peersToAsk: Set[PeerId],
8182
targetBlockNumber: BigInt,
8283
timeout: Cancellable,
83-
headers: Map[BlockHeader, Int]
84+
headers: Map[ByteString, BlockHeaderWithVotes]
8485
): Receive =
8586
handleCommonMessages orElse {
8687
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
8788
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
8889
val updatedPeersToAsk = peersToAsk - peerId
89-
val targetBlockHeaderOpt = blockHeaders.headers.find(header => header.number == targetBlockNumber)
90-
90+
val targetBlockHeaderOpt =
91+
if (blockHeaders.headers.size != 1) None
92+
else
93+
blockHeaders.headers.find(header => header.number == targetBlockNumber)
9194
targetBlockHeaderOpt match {
9295
case Some(targetBlockHeader) =>
9396
log.info("Received vote for {} from {}", targetBlockHeader.hashAsHexString, peerId.value)
94-
val newValue = headers.find(_._1 == targetBlockHeader).map(_._2 + 1).getOrElse(1)
95-
val updatedHeaders = headers.updated(targetBlockHeader, newValue)
96-
val (mostPopularBlockHeader, votes) = updatedHeaders.maxBy(_._2)
97-
if (votes >= minPeersToChoosePivotBlock) {
97+
val newValue =
98+
headers.get(targetBlockHeader.hash).map(_.vote).getOrElse(BlockHeaderWithVotes(targetBlockHeader))
99+
val updatedHeaders = headers.updated(targetBlockHeader.hash, newValue)
100+
val BlockHeaderWithVotes(mostPopularBlockHeader, updatedVotes) = updatedHeaders.mostVotedHeader
101+
if (updatedVotes >= minPeersToChoosePivotBlock) {
98102
timeout.cancel()
99103
sendResponseAndCleanup(mostPopularBlockHeader)
100-
} else if (updatedPeersToAsk.size + votes < minPeersToChoosePivotBlock) {
104+
} else if (!isPossibleToReachConsensus(updatedPeersToAsk.size, updatedVotes)) {
101105
timeout.cancel()
102106
peerEventBus ! Unsubscribe()
103107
log.info("Not enough votes for pivot block. Retrying in {}", startRetryInterval)
104108
scheduleRetry(startRetryInterval)
105-
context become idle
106109
} else {
107110
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, updatedHeaders)
108111
}
109112
case None =>
110113
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header, blacklisting")
111-
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, headers)
114+
val BlockHeaderWithVotes(_, votes) = headers.mostVotedHeader
115+
if (!isPossibleToReachConsensus(updatedPeersToAsk.size, votes)) {
116+
timeout.cancel()
117+
log.info("Not enough votes for pivot block. Retrying in {}", startRetryInterval)
118+
peerEventBus ! Unsubscribe()
119+
scheduleRetry(startRetryInterval)
120+
} else
121+
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, headers)
112122
}
113123
case PivotBlockTimeout =>
114124
peersToAsk.foreach { peerId =>
@@ -117,11 +127,14 @@ class FastSyncPivotBlockSelector(
117127
peerEventBus ! Unsubscribe()
118128
log.info("Pivot block header receive timeout. Retrying in {}", startRetryInterval)
119129
scheduleRetry(startRetryInterval)
120-
context become idle
121130
}
122131

132+
private def isPossibleToReachConsensus(peersLeft: Int, bestHeaderVotes: Int): Boolean =
133+
peersLeft + bestHeaderVotes >= minPeersToChoosePivotBlock
134+
123135
def scheduleRetry(interval: FiniteDuration): Unit = {
124136
scheduler.scheduleOnce(interval, self, ChoosePivotBlock)
137+
context become idle
125138
}
126139

127140
def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
@@ -140,5 +153,15 @@ object FastSyncPivotBlockSelector {
140153
case object ChoosePivotBlock
141154
case class Result(targetBlockHeader: BlockHeader)
142155

143-
private case object PivotBlockTimeout
156+
case object PivotBlockTimeout
157+
158+
case class BlockHeaderWithVotes(header: BlockHeader, votes: Int = 1) {
159+
def vote: BlockHeaderWithVotes = copy(votes = votes + 1)
160+
}
161+
162+
implicit class SortableHeadersMap(headers: Map[ByteString, BlockHeaderWithVotes]) {
163+
def mostVotedHeader: BlockHeaderWithVotes = headers.maxBy { case (_, headerWithVotes) =>
164+
headerWithVotes.votes
165+
}._2
166+
}
144167
}

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@ trait PeerListSupport {
2222

2323
var handshakedPeers: PeersMap = Map.empty
2424

25-
scheduler.scheduleWithFixedDelay(0.seconds, syncConfig.peersScanInterval, etcPeerManager, EtcPeerManagerActor.GetHandshakedPeers)(global, context.self)
25+
scheduler.scheduleWithFixedDelay(
26+
0.seconds,
27+
syncConfig.peersScanInterval,
28+
etcPeerManager,
29+
EtcPeerManagerActor.GetHandshakedPeers
30+
)(global, context.self)
2631

2732
def removePeer(peerId: PeerId): Unit = {
2833
peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId)))

src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncPivotBlockSelectorSpec.scala

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ class FastSyncPivotBlockSelectorSpec extends AnyFlatSpec with Matchers with Befo
6767
peerMessageBus.expectMsg(Unsubscribe())
6868
}
6969

70-
it should "ask for the block number 0 if [bestPeerBestBlockNumber < syncConfig.targetBlockOffset]" in new TestSetup {
70+
it should "ask for the block number 0 if [bestPeerBestBlockNumber < syncConfig.pivotBlockOffset]" in new TestSetup {
7171
val highestNumber = syncConfig.pivotBlockOffset - 1
7272

7373
updateHandshakedPeers(
7474
HandshakedPeers(
75-
allPeers
75+
threeAcceptedPeers
7676
.updated(peer1, threeAcceptedPeers(peer1).copy(maxBlockNumber = highestNumber))
7777
.updated(peer2, threeAcceptedPeers(peer2).copy(maxBlockNumber = highestNumber / 2))
7878
.updated(peer3, threeAcceptedPeers(peer3).copy(maxBlockNumber = highestNumber / 5))
@@ -229,7 +229,7 @@ class FastSyncPivotBlockSelectorSpec extends AnyFlatSpec with Matchers with Befo
229229
peerMessageBus.expectMsg(Unsubscribe())
230230
}
231231

232-
it should "check only peers with the highest block at least equal to [bestPeerBestBlockNumber - syncConfig.targetBlockOffset]" in new TestSetup {
232+
it should "check only peers with the highest block at least equal to [bestPeerBestBlockNumber - syncConfig.pivotBlockOffset]" in new TestSetup {
233233
updateHandshakedPeers(
234234
HandshakedPeers(allPeers.updated(peer1, allPeers(peer1).copy(maxBlockNumber = expectedPivotBlock - 1)))
235235
)
@@ -244,6 +244,40 @@ class FastSyncPivotBlockSelectorSpec extends AnyFlatSpec with Matchers with Befo
244244
peerMessageBus.expectNoMessage() // Peer 1 will be skipped
245245
}
246246

247+
it should "only use only peers from the correct network to choose pivot block" in new TestSetup() {
248+
updateHandshakedPeers(HandshakedPeers(peersFromDifferentNetworks))
249+
250+
pivotBlockSelector ! ChoosePivotBlock
251+
252+
peerMessageBus.expectMsgAllOf(
253+
Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))),
254+
// Peer 2 is skipped
255+
Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer3.id))),
256+
Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer4.id)))
257+
)
258+
peerMessageBus.expectNoMessage()
259+
260+
etcPeerManager.expectMsgAllOf(
261+
EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false), peer1.id),
262+
EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false), peer3.id),
263+
EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false), peer4.id)
264+
)
265+
266+
// Collecting pivot block (for voting)
267+
pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(pivotBlockHeader)), peer1.id)
268+
pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(pivotBlockHeader)), peer3.id)
269+
pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(pivotBlockHeader)), peer4.id)
270+
271+
peerMessageBus.expectMsgAllOf(
272+
Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))),
273+
Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer3.id))),
274+
Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer4.id)))
275+
)
276+
277+
fastSync.expectMsg(Result(pivotBlockHeader))
278+
peerMessageBus.expectMsg(Unsubscribe())
279+
}
280+
247281
class TestSetup extends TestSyncConfig {
248282

249283
private def isNewBlock(msg: Message): Boolean = msg match {
@@ -377,6 +411,37 @@ class FastSyncPivotBlockSelectorSpec extends AnyFlatSpec with Matchers with Befo
377411
)
378412
)
379413

414+
val peersFromDifferentNetworks = Map(
415+
peer1 -> PeerInfo(
416+
peer1Status,
417+
forkAccepted = true,
418+
totalDifficulty = peer1Status.totalDifficulty,
419+
maxBlockNumber = bestBlock,
420+
bestBlockHash = peer1Status.bestHash
421+
),
422+
peer2 -> PeerInfo(
423+
peer2Status,
424+
forkAccepted = false,
425+
totalDifficulty = peer1Status.totalDifficulty,
426+
maxBlockNumber = bestBlock,
427+
bestBlockHash = peer2Status.bestHash
428+
),
429+
peer3 -> PeerInfo(
430+
peer3Status,
431+
forkAccepted = true,
432+
totalDifficulty = peer1Status.totalDifficulty,
433+
maxBlockNumber = bestBlock,
434+
bestBlockHash = peer3Status.bestHash
435+
),
436+
peer4 -> PeerInfo(
437+
peer4Status,
438+
forkAccepted = true,
439+
totalDifficulty = peer1Status.totalDifficulty,
440+
maxBlockNumber = bestBlock,
441+
bestBlockHash = peer4Status.bestHash
442+
)
443+
)
444+
380445
def updateHandshakedPeers(handshakedPeers: HandshakedPeers): Unit = pivotBlockSelector ! handshakedPeers
381446
}
382447
}

0 commit comments

Comments
 (0)