Skip to content

Commit 955765e

Browse files
author
Michal Mrozek
committed
[ECTM-104] Pivot block selection algorithm
1 parent f421f79 commit 955765e

File tree

12 files changed

+1147
-530
lines changed

12 files changed

+1147
-530
lines changed

src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
6464
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
6565
_ <- peer1.waitForFastSyncFinish()
6666
} yield {
67-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
68-
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
67+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
68+
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset)
6969
}
7070
}
7171

@@ -81,13 +81,13 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
8181
val trie = peer1.getBestBlockTrie()
8282
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
8383
// state, so peer1 can get whole trie from both of them.
84-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
85-
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
84+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
85+
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset)
8686
assert(trie.isDefined)
8787
}
8888
}
8989

90-
it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
90+
it should "should update pivot block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
9191
case (peer1, peer2) =>
9292
for {
9393
_ <- peer2.importBlocksUntil(1000)(IdentityUpdate)
@@ -96,7 +96,7 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
9696
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
9797
_ <- peer1.waitForFastSyncFinish()
9898
} yield {
99-
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
99+
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
100100
}
101101
}
102102
}
@@ -308,7 +308,7 @@ object FastSyncItSpec {
308308
lazy val validators = new MockValidatorsAlwaysSucceed
309309

310310
val testSyncConfig = syncConfig.copy(
311-
minPeersToChooseTargetBlock = 1,
311+
minPeersToChoosePivotBlock = 1,
312312
peersScanInterval = 5.milliseconds,
313313
blockHeadersPerRequest = 200,
314314
blockBodiesPerRequest = 50,

src/main/resources/application.conf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,12 @@ mantis {
320320
# Requested number of MPT nodes when syncing from other peers
321321
nodes-per-request = 384
322322

323-
# Minimum number of peers required to start fast-sync (by determining the target block)
324-
min-peers-to-choose-target-block = 2
323+
# Minimum number of peers required to start fast-sync (by determining the pivot block)
324+
min-peers-to-choose-pivot-block = 3
325325

326326
# During fast-sync when most up to date block is determined from peers, the actual target block number
327327
# will be decreased by this value
328-
target-block-offset = 128
328+
pivot-block-offset = 128
329329

330330
# How often to query peers for new blocks after the top of the chain has been reached
331331
check-for-new-block-interval = 10.seconds

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 227 additions & 140 deletions
Large diffs are not rendered by default.
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
4+
import io.iohk.ethereum.domain.BlockHeader
5+
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
6+
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
7+
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
8+
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
9+
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
10+
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
11+
import io.iohk.ethereum.utils.Config.SyncConfig
12+
import scala.concurrent.duration.FiniteDuration
13+
import scala.concurrent.ExecutionContext.Implicits.global
14+
15+
class FastSyncPivotBlockSelector(
16+
val etcPeerManager: ActorRef,
17+
val peerEventBus: ActorRef,
18+
val syncConfig: SyncConfig,
19+
val scheduler: Scheduler
20+
) extends Actor
21+
with ActorLogging
22+
with PeerListSupport
23+
with BlacklistSupport {
24+
25+
import FastSyncPivotBlockSelector._
26+
import syncConfig._
27+
28+
val fastSync: ActorRef = context.parent
29+
30+
def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages
31+
32+
override def receive: Receive = idle
33+
34+
def idle: Receive = handleCommonMessages orElse { case ChoosePivotBlock =>
35+
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
36+
(peer, maxBlockNumber)
37+
}
38+
39+
if (peersUsedToChooseTarget.size >= minPeersToChoosePivotBlock) {
40+
tryChooseTargetBlock(peersUsedToChooseTarget)
41+
} else {
42+
log.info(
43+
"Cannot pick pivot block. Need at least {} peers, but there are only {} available at the moment. Retrying in {}",
44+
minPeersToChoosePivotBlock,
45+
peersUsedToChooseTarget.size,
46+
startRetryInterval
47+
)
48+
scheduleRetry(startRetryInterval)
49+
context become idle
50+
}
51+
}
52+
53+
def tryChooseTargetBlock(peersWithBestBlockNumbers: Map[Peer, BigInt]): Unit = {
54+
val peersSortedByBestNumber = peersWithBestBlockNumbers.toList.sortBy(-_._2)
55+
val bestPeerBestBlockNumber = peersSortedByBestNumber.head._2
56+
val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0)
57+
val peersToAsk = peersSortedByBestNumber.takeWhile(_._2 >= expectedPivotBlock).map(_._1)
58+
59+
log.info(
60+
"Trying to choose fast sync pivot block using {} peers. The best block is {}. Ask {} peers for block nr {}",
61+
peersWithBestBlockNumbers.size,
62+
bestPeerBestBlockNumber,
63+
peersToAsk.size,
64+
expectedPivotBlock
65+
)
66+
67+
peersToAsk.foreach { peer =>
68+
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
69+
etcPeerManager ! EtcPeerManagerActor.SendMessage(
70+
GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false),
71+
peer.id
72+
)
73+
}
74+
75+
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, PivotBlockTimeout)
76+
context become waitingForPivotBlock(peersToAsk.map(_.id).toSet, expectedPivotBlock, timeout, Map.empty)
77+
}
78+
79+
def waitingForPivotBlock(
80+
peersToAsk: Set[PeerId],
81+
targetBlockNumber: BigInt,
82+
timeout: Cancellable,
83+
headers: Map[BlockHeader, Int]
84+
): Receive =
85+
handleCommonMessages orElse {
86+
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
87+
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
88+
val updatedPeersToAsk = peersToAsk - peerId
89+
val targetBlockHeaderOpt = blockHeaders.headers.find(header => header.number == targetBlockNumber)
90+
91+
targetBlockHeaderOpt match {
92+
case Some(targetBlockHeader) =>
93+
log.info("Received vote for {} from {}", targetBlockHeader.hashAsHexString, peerId.value)
94+
val newValue = headers.find(_._1 == targetBlockHeader).map(_._2 + 1).getOrElse(1)
95+
val updatedHeaders = headers.updated(targetBlockHeader, newValue)
96+
val (mostPopularBlockHeader, votes) = updatedHeaders.maxBy(_._2)
97+
if (votes >= minPeersToChoosePivotBlock) {
98+
timeout.cancel()
99+
sendResponseAndCleanup(mostPopularBlockHeader)
100+
} else if (updatedPeersToAsk.size + votes < minPeersToChoosePivotBlock) {
101+
timeout.cancel()
102+
peerEventBus ! Unsubscribe()
103+
log.info("Not enough votes for pivot block. Retrying in {}", startRetryInterval)
104+
scheduleRetry(startRetryInterval)
105+
context become idle
106+
} else {
107+
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, updatedHeaders)
108+
}
109+
case None =>
110+
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header, blacklisting")
111+
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, headers)
112+
}
113+
case PivotBlockTimeout =>
114+
peersToAsk.foreach { peerId =>
115+
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header (timeout), blacklisting")
116+
}
117+
peerEventBus ! Unsubscribe()
118+
log.info("Pivot block header receive timeout. Retrying in {}", startRetryInterval)
119+
scheduleRetry(startRetryInterval)
120+
context become idle
121+
}
122+
123+
def scheduleRetry(interval: FiniteDuration): Unit = {
124+
scheduler.scheduleOnce(interval, self, ChoosePivotBlock)
125+
}
126+
127+
def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
128+
log.info("Found pivot block: {} hash: {}", pivotBlockHeader.number, pivotBlockHeader.hashAsHexString)
129+
fastSync ! Result(pivotBlockHeader)
130+
peerEventBus ! Unsubscribe()
131+
context stop self
132+
}
133+
134+
}
135+
136+
object FastSyncPivotBlockSelector {
137+
def props(etcPeerManager: ActorRef, peerEventBus: ActorRef, syncConfig: SyncConfig, scheduler: Scheduler): Props =
138+
Props(new FastSyncPivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler))
139+
140+
case object ChoosePivotBlock
141+
case class Result(targetBlockHeader: BlockHeader)
142+
143+
private case object PivotBlockTimeout
144+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala

Lines changed: 0 additions & 147 deletions
This file was deleted.

src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ object EtcPeerManagerActor {
230230
val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code, NewBlockHashes.code)
231231

232232
case class PeerInfo(
233-
remoteStatus: Status,
233+
remoteStatus: Status, // Updated only after handshaking
234234
totalDifficulty: BigInt,
235235
forkAccepted: Boolean,
236236
maxBlockNumber: BigInt,

0 commit comments

Comments
 (0)