1
1
package io .iohk .ethereum .blockchain .sync
2
2
3
3
import java .time .Instant
4
-
5
4
import akka .actor ._
6
5
import akka .util .ByteString
7
6
import cats .data .NonEmptyList
@@ -25,7 +24,6 @@ import io.iohk.ethereum.network.p2p.messages.PV63._
25
24
import io .iohk .ethereum .utils .ByteStringUtils
26
25
import io .iohk .ethereum .utils .Config .SyncConfig
27
26
import org .bouncycastle .util .encoders .Hex
28
-
29
27
import scala .annotation .tailrec
30
28
import scala .concurrent .ExecutionContext .Implicits .global
31
29
import scala .concurrent .duration .{FiniteDuration , _ }
@@ -647,24 +645,25 @@ class FastSync(
647
645
} else {
648
646
val now = Instant .now()
649
647
val peers = unassignedPeers
650
- .filter(p => peerRequestsTime.get(p).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now)))
648
+ .filter(p => peerRequestsTime.get(p.peer ).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now)))
651
649
peers
652
650
.take(maxConcurrentRequests - assignedHandlers.size)
653
- .toSeq
654
- .sortBy(_.ref.toString())
651
+ .sortBy(_.info.maxBlockNumber)(Ordering [BigInt ].reverse)
655
652
.foreach(assignBlockchainWork)
656
653
}
657
654
}
658
655
659
- def assignBlockchainWork (peer : Peer ): Unit = {
656
+ def assignBlockchainWork (peerWithInfo : PeerWithInfo ): Unit = {
657
+ val PeerWithInfo (peer, peerInfo) = peerWithInfo
660
658
if (syncState.receiptsQueue.nonEmpty) {
661
659
requestReceipts(peer)
662
660
} else if (syncState.blockBodiesQueue.nonEmpty) {
663
661
requestBlockBodies(peer)
664
662
} else if (
665
663
requestedHeaders.isEmpty &&
666
664
context.child(BlockHeadersHandlerName ).isEmpty &&
667
- syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget
665
+ syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget &&
666
+ peerInfo.maxBlockNumber >= syncState.pivotBlock.number
668
667
) {
669
668
requestBlockHeaders(peer)
670
669
}
@@ -737,7 +736,8 @@ class FastSync(
737
736
peerRequestsTime += (peer -> Instant .now())
738
737
}
739
738
740
- def unassignedPeers : Set [Peer ] = peersToDownloadFrom.keySet diff assignedHandlers.values.toSet
739
+ def unassignedPeers : List [PeerWithInfo ] =
740
+ (peersToDownloadFrom -- assignedHandlers.values).map(PeerWithInfo .tupled).toList
741
741
742
742
def blockchainDataToDownload : Boolean =
743
743
syncState.blockChainWorkQueued || syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget
@@ -768,6 +768,8 @@ class FastSync(
768
768
769
769
object FastSync {
770
770
771
+ case class PeerWithInfo (peer : Peer , info : PeerInfo )
772
+
771
773
// scalastyle:off parameter.number
772
774
def props (
773
775
fastSyncStateStorage : FastSyncStateStorage ,
0 commit comments