@@ -167,6 +167,8 @@ trait FastSync {
167
167
168
168
syncStateStorageActor ! fastSyncStateStorage
169
169
170
+ private var blockChainOnlyPeers = Set .empty[Peer ]
171
+
170
172
private val syncStatePersistCancellable =
171
173
scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval) {
172
174
syncStateStorageActor ! SyncState (
@@ -181,7 +183,7 @@ trait FastSync {
181
183
182
184
private val heartBeat = scheduler.schedule(syncRetryInterval, syncRetryInterval * 2 , self, ProcessSyncing )
183
185
184
- def receive : Receive = handlePeerUpdates orElse {
186
+ def receive : Receive = handlePeerUpdates orElse handleFailingMptPeers orElse {
185
187
case EnqueueNodes (hashes) =>
186
188
hashes.foreach {
187
189
case h : EvmCodeHash => nonMptNodesQueue = h +: nonMptNodesQueue
@@ -227,6 +229,11 @@ trait FastSync {
227
229
printStatus()
228
230
}
229
231
232
+ private def handleFailingMptPeers : Receive = {
233
+ case BlockChainOnlyDownload (peer) =>
234
+ blockChainOnlyPeers = blockChainOnlyPeers + peer
235
+ }
236
+
230
237
private def printStatus () = {
231
238
val totalNodesCount = downloadedNodesCount + mptNodesQueue.size + nonMptNodesQueue.size
232
239
val formatPeer : (Peer ) => String = peer => s " ${peer.remoteAddress.getAddress.getHostAddress}: ${peer.remoteAddress.getPort}"
@@ -316,22 +323,33 @@ trait FastSync {
316
323
scheduler.scheduleOnce(syncRetryInterval, self, ProcessSyncing )
317
324
}
318
325
} else {
319
- unassignedPeers
326
+ val peers = unassignedPeers
327
+ (peers -- blockChainOnlyPeers)
320
328
.take(maxConcurrentRequests - assignedHandlers.size)
321
329
.foreach(assignWork)
330
+ peers
331
+ .intersect(blockChainOnlyPeers)
332
+ .take(maxConcurrentRequests - assignedHandlers.size)
333
+ .foreach(assignBlockChainWork)
322
334
}
323
335
}
324
336
325
337
def assignWork (peer : Peer ): Unit = {
338
+ if (nonMptNodesQueue.nonEmpty || mptNodesQueue.nonEmpty) {
339
+ requestNodes(peer)
340
+ } else {
341
+ assignBlockChainWork(peer)
342
+ }
343
+ }
344
+
345
+ def assignBlockChainWork (peer : Peer ): Unit = {
326
346
if (receiptsQueue.nonEmpty) {
327
347
requestReceipts(peer)
328
348
} else if (blockBodiesQueue.nonEmpty) {
329
349
requestBlockBodies(peer)
330
350
} else if (context.child(blockHeadersHandlerName).isEmpty &&
331
351
initialSyncState.targetBlock.number > bestBlockHeaderNumber) {
332
352
requestBlockHeaders(peer)
333
- } else if (nonMptNodesQueue.nonEmpty || mptNodesQueue.nonEmpty) {
334
- requestNodes(peer)
335
353
}
336
354
}
337
355
@@ -423,6 +441,7 @@ object FastSync {
423
441
private case object TargetBlockTimeout
424
442
425
443
private case object ProcessSyncing
444
+ case class BlockChainOnlyDownload (peer : Peer )
426
445
427
446
case class SyncState (
428
447
targetBlock : BlockHeader ,
0 commit comments