@@ -8,60 +8,48 @@ import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
8
8
import io .iohk .ethereum .network .PeerEventBusActor .SubscriptionClassifier .MessageClassifier
9
9
import io .iohk .ethereum .network .PeerEventBusActor .{PeerSelector , Subscribe , Unsubscribe }
10
10
import io .iohk .ethereum .network .p2p .messages .PV62 .{BlockHeaders , GetBlockHeaders }
11
- import io .iohk .ethereum .network .{EtcPeerManagerActor , PeerId }
11
+ import io .iohk .ethereum .network .{EtcPeerManagerActor , Peer , PeerId }
12
12
import io .iohk .ethereum .utils .Config .SyncConfig
13
13
import scala .concurrent .ExecutionContext .Implicits .global
14
14
import scala .concurrent .duration .FiniteDuration
15
15
16
- class FastSyncPivotBlockSelector (
16
+ class PivotBlockSelector (
17
17
val etcPeerManager : ActorRef ,
18
18
val peerEventBus : ActorRef ,
19
19
val syncConfig : SyncConfig ,
20
- val scheduler : Scheduler
20
+ val scheduler : Scheduler ,
21
+ fastSync : ActorRef
21
22
) extends Actor
22
23
with ActorLogging
23
24
with PeerListSupport
24
25
with BlacklistSupport {
25
26
26
- import FastSyncPivotBlockSelector ._
27
+ import PivotBlockSelector ._
27
28
import syncConfig ._
28
29
29
- val fastSync : ActorRef = context.parent
30
-
31
30
def handleCommonMessages : Receive = handlePeerListMessages orElse handleBlacklistMessages
32
31
33
32
override def receive : Receive = idle
34
33
35
- def idle : Receive = handleCommonMessages orElse { case ChoosePivotBlock =>
36
- val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo (_, _, true , maxBlockNumber, _)) =>
37
- (peer, maxBlockNumber)
38
- }
39
-
40
- val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => - number }
41
- val bestPeerBestBlockNumber = peersSortedByBestNumber.headOption
42
- .map { case (_, bestPeerBestBlockNumber) => bestPeerBestBlockNumber }
43
- .getOrElse(BigInt (0 ))
44
- val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0 )
45
- val correctPeers = peersSortedByBestNumber
46
- .takeWhile { case (_, number) => number >= expectedPivotBlock }
47
- .map { case (peer, _) => peer }
34
+ def idle : Receive = handleCommonMessages orElse { case SelectPivotBlock =>
35
+ val election @ ElectionDetails (correctPeers, expectedPivotBlock) = collectVoters
48
36
49
- if (correctPeers.size >= minPeersToChoosePivotBlock) {
37
+ if (election.isEnoughVoters( minPeersToChoosePivotBlock) ) {
50
38
51
39
val (peersToAsk, waitingPeers) = correctPeers.splitAt(minPeersToChoosePivotBlock + peersToChoosePivotBlockMargin)
52
40
53
41
log.info(
54
42
" Trying to choose fast sync pivot block using {} peers ({} correct ones). Ask {} peers for block nr {}" ,
55
- peersUsedToChooseTarget .size,
43
+ peersToDownloadFrom .size,
56
44
correctPeers.size,
57
45
peersToAsk.size,
58
46
expectedPivotBlock
59
47
)
60
48
61
49
peersToAsk.foreach(peer => obtainBlockHeaderFromPeer(peer.id, expectedPivotBlock))
62
50
63
- val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, PivotBlockTimeout )
64
- context become waitingForPivotBlock (
51
+ val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, ElectionPivotBlockTimeout )
52
+ context become runningPivotBlockElection (
65
53
peersToAsk.map(_.id).toSet,
66
54
waitingPeers.map(_.id),
67
55
expectedPivotBlock,
@@ -73,14 +61,14 @@ class FastSyncPivotBlockSelector(
73
61
" Cannot pick pivot block. Need at least {} peers, but there are only {} which meet the criteria ({} all available at the moment). Retrying in {}" ,
74
62
minPeersToChoosePivotBlock,
75
63
correctPeers.size,
76
- peersUsedToChooseTarget .size,
64
+ peersToDownloadFrom .size,
77
65
startRetryInterval
78
66
)
79
67
scheduleRetry(startRetryInterval)
80
68
}
81
69
}
82
70
83
- def waitingForPivotBlock (
71
+ def runningPivotBlockElection (
84
72
peersToAsk : Set [PeerId ],
85
73
waitingPeers : List [PeerId ],
86
74
pivotBlockNumber : BigInt ,
@@ -105,7 +93,7 @@ class FastSyncPivotBlockSelector(
105
93
blacklist(peerId, blacklistDuration, " Did not respond with pivot block header, blacklisting" )
106
94
votingProcess(updatedPeersToAsk, waitingPeers, pivotBlockNumber, timeout, headers)
107
95
}
108
- case PivotBlockTimeout =>
96
+ case ElectionPivotBlockTimeout =>
109
97
peersToAsk.foreach { peerId =>
110
98
blacklist(peerId, blacklistDuration, " Did not respond with pivot block header (timeout), blacklisting" )
111
99
}
@@ -130,12 +118,12 @@ class FastSyncPivotBlockSelector(
130
118
} else if (! isPossibleToReachConsensus(peersToAsk.size, updatedVotes)) {
131
119
timeout.cancel()
132
120
if (waitingPeers.nonEmpty) { // There are more peers to ask
133
- val newTimeout = scheduler.scheduleOnce(peerResponseTimeout, self, PivotBlockTimeout )
121
+ val newTimeout = scheduler.scheduleOnce(peerResponseTimeout, self, ElectionPivotBlockTimeout )
134
122
val additionalPeer :: newWaitingPeers = waitingPeers
135
123
136
124
obtainBlockHeaderFromPeer(additionalPeer, pivotBlockNumber)
137
125
138
- context become waitingForPivotBlock (
126
+ context become runningPivotBlockElection (
139
127
peersToAsk + additionalPeer,
140
128
newWaitingPeers,
141
129
pivotBlockNumber,
@@ -149,7 +137,7 @@ class FastSyncPivotBlockSelector(
149
137
}
150
138
// Continue voting
151
139
} else {
152
- context become waitingForPivotBlock (
140
+ context become runningPivotBlockElection (
153
141
peersToAsk,
154
142
waitingPeers,
155
143
pivotBlockNumber,
@@ -163,7 +151,7 @@ class FastSyncPivotBlockSelector(
163
151
peersLeft + bestHeaderVotes >= minPeersToChoosePivotBlock
164
152
165
153
def scheduleRetry (interval : FiniteDuration ): Unit = {
166
- scheduler.scheduleOnce(interval, self, ChoosePivotBlock )
154
+ scheduler.scheduleOnce(interval, self, SelectPivotBlock )
167
155
context become idle
168
156
}
169
157
@@ -181,16 +169,39 @@ class FastSyncPivotBlockSelector(
181
169
peer
182
170
)
183
171
}
184
- }
185
172
186
- object FastSyncPivotBlockSelector {
187
- def props (etcPeerManager : ActorRef , peerEventBus : ActorRef , syncConfig : SyncConfig , scheduler : Scheduler ): Props =
188
- Props (new FastSyncPivotBlockSelector (etcPeerManager : ActorRef , peerEventBus, syncConfig, scheduler))
173
+ private def collectVoters : ElectionDetails = {
174
+ val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo (_, _, true , maxBlockNumber, _)) =>
175
+ (peer, maxBlockNumber)
176
+ }
189
177
190
- case object ChoosePivotBlock
178
+ val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => - number }
179
+ val bestPeerBestBlockNumber = peersSortedByBestNumber.headOption
180
+ .map { case (_, bestPeerBestBlockNumber) => bestPeerBestBlockNumber }
181
+ .getOrElse(BigInt (0 ))
182
+ val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0 )
183
+ val correctPeers = peersSortedByBestNumber
184
+ .takeWhile { case (_, number) => number >= expectedPivotBlock }
185
+ .map { case (peer, _) => peer }
186
+
187
+ ElectionDetails (correctPeers, expectedPivotBlock)
188
+ }
189
+ }
190
+
191
+ object PivotBlockSelector {
192
+ def props (
193
+ etcPeerManager : ActorRef ,
194
+ peerEventBus : ActorRef ,
195
+ syncConfig : SyncConfig ,
196
+ scheduler : Scheduler ,
197
+ fastSync : ActorRef
198
+ ): Props =
199
+ Props (new PivotBlockSelector (etcPeerManager : ActorRef , peerEventBus, syncConfig, scheduler, fastSync))
200
+
201
+ case object SelectPivotBlock
191
202
case class Result (targetBlockHeader : BlockHeader )
192
203
193
- case object PivotBlockTimeout
204
+ case object ElectionPivotBlockTimeout
194
205
195
206
case class BlockHeaderWithVotes (header : BlockHeader , votes : Int = 1 ) {
196
207
def vote : BlockHeaderWithVotes = copy(votes = votes + 1 )
@@ -201,4 +212,8 @@ object FastSyncPivotBlockSelector {
201
212
headerWithVotes.votes
202
213
}._2
203
214
}
215
+
216
+ case class ElectionDetails (participants : List [Peer ], expectedPivotBlock : BigInt ) {
217
+ def isEnoughVoters (minNumberOfVoters : Int ): Boolean = participants.size >= minNumberOfVoters
218
+ }
204
219
}
0 commit comments