Skip to content

Commit 852133b

Browse files
authored
Etcm 1055 Adjust PeersClient.nextBestBlock() (#1101)
* ETCM-1063: Add tests for FetcherService and PeersClient * ETCM-1055: Make PeersClient.nextBestPeer fetch a peer with a different chain
1 parent 6ff7f2a commit 852133b

File tree

3 files changed

+45
-38
lines changed

3 files changed

+45
-38
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.iohk.ethereum.blockchain
2+
3+
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
4+
5+
object PeerComparator {
6+
7+
def doPeersHaveSameBestBlock(peerInfo1: PeerInfo, peerInfo2: PeerInfo): Boolean =
8+
peerInfo1.bestBlockHash == peerInfo2.bestBlockHash
9+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import scala.concurrent.ExecutionContext
1414
import scala.jdk.CollectionConverters._
1515
import scala.reflect.ClassTag
1616

17+
import io.iohk.ethereum.blockchain.PeerComparator
1718
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
1819
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
1920
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
@@ -43,6 +44,10 @@ class PeersClient(
4344
val statusSchedule: Cancellable =
4445
scheduler.scheduleWithFixedDelay(syncConfig.printStatusInterval, syncConfig.printStatusInterval, self, PrintStatus)
4546

47+
val numberOfPeersToFetchFrom = syncConfig.peersToFetchFrom
48+
49+
val activeFetchingNodes: mutable.Set[PeerWithInfo] = lruSet(numberOfPeersToFetchFrom)
50+
4651
def receive: Receive = running(Map())
4752

4853
override def postStop(): Unit = {
@@ -100,7 +105,7 @@ class PeersClient(
100105
private def selectPeer(peerSelector: PeerSelector): Option[Peer] =
101106
peerSelector match {
102107
case BestPeer => bestPeer(peersToDownloadFrom)
103-
case NextBestPeer => nextBestPeer(peersToDownloadFrom)
108+
case NextBestPeer => nextBestPeer(peersToDownloadFrom, activeFetchingNodes.toSet)
104109
}
105110

106111
private def responseClassTag[RequestMsg <: Message](requestMsg: RequestMsg): ClassTag[_ <: Message] =
@@ -149,11 +154,6 @@ object PeersClient {
149154

150155
type Requesters = Map[ActorRef, ActorRef]
151156

152-
//TODO: get the value from syncConfig.peersToFetchFrom
153-
val numberOfPeersToFetchFrom = 1
154-
155-
val activeFetchingNodes: mutable.Set[Peer] = lruSet[Peer](numberOfPeersToFetchFrom)
156-
157157
private def lruSet[A](maxEntries: Int): mutable.Set[A] =
158158
newSetFromMap[A](new java.util.LinkedHashMap[A, java.lang.Boolean]() {
159159
override def removeEldestEntry(eldest: java.util.Map.Entry[A, java.lang.Boolean]): Boolean = size > maxEntries
@@ -209,18 +209,23 @@ object PeersClient {
209209
}
210210

211211
//returns the next best peer after the one already returned previously
212-
//TODO: make sure the next best peer has a different best block, so we don't fetch in parallel identical branches
213212
//TODO: whenever this method is called - do activeFetchingNodes.add(_) on the peer returned
214-
def nextBestPeer(peersToDownloadFrom: Map[PeerId, PeerWithInfo]): Option[Peer] = {
213+
def nextBestPeer(
214+
peersToDownloadFrom: Map[PeerId, PeerWithInfo],
215+
activeFetchingNodes: Set[PeerWithInfo]
216+
): Option[Peer] = {
215217
val peersToUse = peersToDownloadFrom.values
216-
.collect { case PeerWithInfo(peer, PeerInfo(_, chainWeight, true, _, _)) =>
217-
(peer, chainWeight)
218+
.collect { case PeerWithInfo(peer, peerInfo @ PeerInfo(_, _, true, _, _)) =>
219+
(peer, peerInfo)
218220
}
219221

220222
val peer =
221223
peersToUse
222-
.filter { case (peer, _) => !activeFetchingNodes.contains(peer) }
223-
.maxByOption { case (_, weight) => weight }
224+
.filterNot { case (peer, _) => activeFetchingNodes.map(_.peer).contains(peer) }
225+
.filterNot { case (_, peerInfo) =>
226+
activeFetchingNodes.map(_.peerInfo).exists(PeerComparator.doPeersHaveSameBestBlock(peerInfo, _))
227+
}
228+
.maxByOption { case (_, peerInfo) => peerInfo.chainWeight }
224229
.map { case (peer, _) => peer }
225230
peer
226231
}

src/test/scala/io/iohk/ethereum/blockchain/sync/PeersClientSpec.scala

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import akka.actor.ActorSystem
66
import akka.testkit.TestProbe
77
import akka.util.ByteString
88

9+
import scala.collection.mutable
10+
911
import org.scalatest.flatspec.AnyFlatSpec
1012
import org.scalatest.matchers.should.Matchers
1113
import org.scalatest.prop.TableFor3
@@ -74,14 +76,14 @@ class PeersClientSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyC
7476
}
7577
}
7678

77-
it should "determine the next best peer (same as bestPeer when lru set is empty)" in {
79+
it should "determine the next best peer (same as bestPeer when lru set is empty)" in {
7880
forAll(table) { (peerInfoMap, expectedPeer, _) =>
79-
PeersClient.nextBestPeer(peerInfoMap) shouldEqual expectedPeer
81+
PeersClient.nextBestPeer(peerInfoMap, Set.empty) shouldEqual expectedPeer
8082
}
8183
}
8284

83-
it should "determine the next best peer when lru is used" in {
84-
val table = Table[Map[PeerId, PeerWithInfo], Option[Peer], Option[Peer], String](
85+
it should "determine the next best peer with a different best block each time" in {
86+
val table = Table[Map[PeerId, PeerWithInfo], Option[PeerWithInfo], Option[Peer], String](
8587
("PeerInfo map", "Used best peer", "Expected best peer", "Scenario info (selected peer)"),
8688
(
8789
Map(),
@@ -100,40 +102,31 @@ class PeersClientSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyC
100102
peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false)),
101103
peer2.id -> PeerWithInfo(peer2, peerInfo(0, 50, fork = true))
102104
),
103-
Some(peer2),
105+
Some(PeerWithInfo(peer2, peerInfo(0, 50, fork = true))),
104106
None,
105-
"Peer2 with lower TD but following the ETC fork"
107+
"Peer2 with lower TD but following the ETC fork, peer2 is already used"
106108
),
107109
(
108110
Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101))),
109-
Some(peer2),
110-
Some(peer1),
111-
"Peer2 with higher TD"
112-
),
113-
(
114-
Map(
115-
peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)),
116-
peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101)),
117-
peer3.id -> PeerWithInfo(peer3, peerInfo(1, 50))
118-
),
119-
Some(peer3),
120-
Some(peer2),
121-
"Peer3 with lower TD but higher checkpoint number"
111+
Some(PeerWithInfo(peer2, peerInfo(0, 101))),
112+
None,
113+
"Both peer are used"
122114
),
123115
(
124116
Map(
125117
peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)),
126-
peer2.id -> PeerWithInfo(peer2, peerInfo(4, 101)),
127-
peer3.id -> PeerWithInfo(peer3, peerInfo(4, 50))
118+
peer2.id -> PeerWithInfo(peer2, peerInfo(1, 50)),
119+
peer3.id -> PeerWithInfo(peer3, peerInfo(0, 80).copy(bestBlockHash = ByteString("differenthash")))
128120
),
129-
Some(peer2),
121+
Some(PeerWithInfo(peer2, peerInfo(1, 50))),
130122
Some(peer3),
131-
"Peer2 with equal checkpoint number and higher TD"
123+
"Peer2 with lower TD but higher checkpoint number, peer 1 and 2 are used"
132124
)
133125
)
134-
forAll(table) { (peerInfoMap, usedPeer, expectedPeer, _) =>
135-
usedPeer.map(PeersClient.activeFetchingNodes.add)
136-
PeersClient.nextBestPeer(peerInfoMap) shouldEqual expectedPeer
126+
val activeFetchingNodes: mutable.Set[PeerWithInfo] = mutable.Set.empty
127+
forAll(table) { (peerInfoMap, usedPeerWithInfo, expectedPeer, _) =>
128+
usedPeerWithInfo.map(activeFetchingNodes.add)
129+
PeersClient.nextBestPeer(peerInfoMap, activeFetchingNodes.toSet) shouldEqual expectedPeer
137130
}
138131
}
139132

0 commit comments

Comments
 (0)