Skip to content

Commit c662b6b

Browse files
committed
ETCM-446: Use the average received response per second as priority for pruning.
1 parent d162592 commit c662b6b

File tree

3 files changed

+116
-49
lines changed

3 files changed

+116
-49
lines changed

src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import java.net.InetSocketAddress
55
import akka.actor.ActorRef
66
import akka.util.ByteString
77
import scala.concurrent.duration.FiniteDuration
8-
import scala.util.Random
98

109
case class ConnectedPeers(
1110
private val incomingPendingPeers: Map[PeerId, Peer],
@@ -85,9 +84,10 @@ case class ConnectedPeers(
8584
}
8685

8786
def prunePeers(
88-
incoming: Boolean,
8987
minAge: FiniteDuration,
9088
numPeers: Int,
89+
priority: PeerId => Double = _ => 0.0,
90+
incoming: Boolean = true,
9191
currentTimeMillis: Long = System.currentTimeMillis
9292
): (Seq[Peer], ConnectedPeers) = {
9393
val ageThreshold = currentTimeMillis - minAge.toMillis
@@ -97,7 +97,7 @@ case class ConnectedPeers(
9797
} else {
9898
val candidates = handshakedPeers.values.filter(canPrune(incoming, ageThreshold)).toSeq
9999

100-
val toPrune = Random.shuffle(candidates).take(numPeers)
100+
val toPrune = candidates.sortBy(peer => priority(peer.id)).take(numPeers)
101101

102102
val pruned = copy(
103103
pruningPeers = toPrune.foldLeft(pruningPeers) { case (acc, peer) =>

src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ class PeerManagerActor(
102102
handleCommonMessages(connectedPeers) orElse
103103
handleBlacklistMessages orElse
104104
handleConnections(connectedPeers) orElse
105-
handleNewNodesToConnectMessages(connectedPeers)
105+
handleNewNodesToConnectMessages(connectedPeers) orElse
106+
handlePruning(connectedPeers)
106107
}
107108

108109
private def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = {
@@ -270,9 +271,9 @@ class PeerManagerActor(
270271
handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers)
271272

272273
// It looks like all incoming slots are taken; try to make some room.
273-
val prunedConnectedPeers = pruneIncomingPeers(connectedPeers)
274+
schedulePruningIncomingPeers()
274275

275-
context become listening(prunedConnectedPeers)
276+
context become listening(connectedPeers)
276277

277278
} else if (handshakedPeer.nodeId.exists(connectedPeers.hasHandshakedWith)) {
278279
// FIXME: peers received after handshake should always have their nodeId defined, we could maybe later distinguish
@@ -301,12 +302,39 @@ class PeerManagerActor(
301302
(pendingPeer, newConnectedPeers)
302303
}
303304

305+
/** Ask for statistics and try to prune incoming peers when they arrive. */
306+
private def schedulePruningIncomingPeers(): Unit = {
307+
implicit val timeout: Timeout = Timeout(peerConfiguration.updateNodesInterval)
308+
// Picking the minimum pruning age is fair for anyone
309+
val window = peerConfiguration.minPruneAge
310+
(peerStatistics ? PeerStatisticsActor.GetStatsForAll(window))
311+
.mapTo[PeerStatisticsActor.StatsForAll]
312+
.map(PruneIncomingPeers(_))
313+
.pipeTo(self)
314+
}
315+
316+
private def handlePruning(connectedPeers: ConnectedPeers): Receive = {
317+
case PruneIncomingPeers(PeerStatisticsActor.StatsForAll(stats)) =>
318+
val prunedConnectedPeers = pruneIncomingPeers(connectedPeers, stats)
319+
320+
context become listening(prunedConnectedPeers)
321+
}
322+
304323
/** Disconnect some incoming connections so we can free up slots. */
305-
private def pruneIncomingPeers(connectedPeers: ConnectedPeers): ConnectedPeers = {
324+
private def pruneIncomingPeers(
325+
connectedPeers: ConnectedPeers,
326+
stats: Map[PeerId, PeerStatisticsActor.Stat]
327+
): ConnectedPeers = {
306328
val pruneCount = PeerManagerActor.numberOfIncomingConnectionsToPrune(connectedPeers, peerConfiguration)
307-
329+
val now = System.currentTimeMillis
308330
val (peersToPrune, prunedConnectedPeers) =
309-
connectedPeers.prunePeers(incoming = true, minAge = peerConfiguration.minPruneAge, numPeers = pruneCount)
331+
connectedPeers.prunePeers(
332+
incoming = true,
333+
minAge = peerConfiguration.minPruneAge,
334+
numPeers = pruneCount,
335+
priority = prunePriority(stats, now),
336+
currentTimeMillis = now
337+
)
310338

311339
peersToPrune.foreach { peer =>
312340
peer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers)
@@ -477,6 +505,8 @@ object PeerManagerActor {
477505

478506
case class PeerAddress(value: String) extends BlackListId
479507

508+
case class PruneIncomingPeers(stats: PeerStatisticsActor.StatsForAll)
509+
480510
/** Number of new connections the node should try to open at any given time. */
481511
def outgoingConnectionDemand(
482512
connectedPeers: ConnectedPeers,
@@ -500,4 +530,23 @@ object PeerManagerActor {
500530
connectedPeers.incomingHandshakedPeersCount - connectedPeers.incomingPruningPeersCount - minIncomingPeers
501531
)
502532
}
533+
534+
/** Assign a priority to peers that we can use to order connections,
535+
* with lower priorities being the ones to prune first.
536+
*/
537+
def prunePriority(stats: Map[PeerId, PeerStatisticsActor.Stat], currentTimeMillis: Long)(peerId: PeerId): Double = {
538+
stats
539+
.get(peerId)
540+
.flatMap { stat =>
541+
val maybeAgeSeconds = stat.firstSeenTimeMillis
542+
.map(currentTimeMillis - _)
543+
.map(_ * 1000)
544+
.filterNot(_ <= 0)
545+
546+
// Use the average number of responses per second over the lifetime of the connection
547+
// as an indicator of how fruitful the peer is for us.
548+
maybeAgeSeconds.map(age => stat.responsesReceived.toDouble / age)
549+
}
550+
.getOrElse(0.0)
551+
}
503552
}

src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually
2525
import org.scalatest.flatspec.AnyFlatSpecLike
2626
import org.scalatest.matchers.should.Matchers
2727
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
28-
import org.scalacheck.{Arbitrary, Gen}, Arbitrary.arbitrary
28+
import org.scalacheck.{Arbitrary, Gen, Shrink}, Arbitrary.arbitrary
2929
import scala.concurrent.duration._
3030

3131
// scalastyle:off magic.number
@@ -260,6 +260,8 @@ class PeerManagerSpec
260260
peerAsOutgoingProbe.expectMsg(PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected))
261261
}
262262

263+
behavior of "outgoingConnectionDemand"
264+
263265
it should "try to connect to at least min-outgoing-peers but no longer than max-outgoing-peers" in new ConnectedPeersFixture {
264266
forAll { (connectedPeers: ConnectedPeers) =>
265267
val demand = PeerManagerActor.outgoingConnectionDemand(connectedPeers, peerConfiguration)
@@ -272,6 +274,8 @@ class PeerManagerSpec
272274
}
273275
}
274276

277+
behavior of "numberOfIncomingConnectionsToPrune"
278+
275279
it should "try to prune incoming connections down to the minimum allowed number" in new ConnectedPeersFixture {
276280
forAll { (connectedPeers: ConnectedPeers) =>
277281
val numPeersToPrune = PeerManagerActor.numberOfIncomingConnectionsToPrune(connectedPeers, peerConfiguration)
@@ -289,20 +293,23 @@ class PeerManagerSpec
289293
}
290294
}
291295

296+
behavior of "ConnectedPeers.prunePeers"
297+
292298
// The `ConnectedPeers` is quite slow to generate, so doing a few tests in one go.
293-
it should "prune peers which are old enough down to incoming number, protecting against repeated forced pruning" in new ConnectedPeersFixture {
299+
it should "prune peers which are old enough, protecting against repeated forced pruning" in new ConnectedPeersFixture {
294300
forAll { (connectedPeers: ConnectedPeers) =>
295301
val numPeersToPrune = PeerManagerActor.numberOfIncomingConnectionsToPrune(connectedPeers, peerConfiguration)
296302

303+
val now = System.currentTimeMillis
304+
297305
// Prune the requested number of peers.
298306
{
299307
// Pretend we are in the future so age doesn't count.
300308
val (maxPrunedPeers, _) =
301309
connectedPeers.prunePeers(
302-
incoming = true,
303310
peerConfiguration.minPruneAge,
304311
numPeers = numPeersToPrune,
305-
currentTimeMillis = System.currentTimeMillis + peerConfiguration.minPruneAge.toMillis + 1
312+
currentTimeMillis = now + peerConfiguration.minPruneAge.toMillis + 1
306313
)
307314

308315
maxPrunedPeers.size shouldBe numPeersToPrune
@@ -311,61 +318,66 @@ class PeerManagerSpec
311318
// Only prune peers which are old enough.
312319
{
313320
val (agedPrunedPeers, _) = connectedPeers.prunePeers(
314-
incoming = true,
315321
peerConfiguration.minPruneAge,
316322
numPeers = numPeersToPrune
317323
)
318324
Inspectors.forAll(agedPrunedPeers) {
319-
_.createTimeMillis shouldBe <=(System.currentTimeMillis - peerConfiguration.minPruneAge.toMillis)
325+
_.createTimeMillis shouldBe <=(now - peerConfiguration.minPruneAge.toMillis)
320326
}
321327
}
322328

323-
// Not prune repeatedly.
329+
// Not prune twice in a row within the prune cool-of time.
324330
{
325-
val minAge = 1.day // That should include all peers in the test data
326-
327-
val (probe2, _) = connectedPeers.prunePeers(
328-
incoming = true,
331+
val now = System.currentTimeMillis
332+
val minAge = 1.minute
333+
// Check that we have at least 2 peers to prune.
334+
val (probe, _) = connectedPeers.prunePeers(
329335
minAge,
330-
numPeers = 2
336+
numPeers = Int.MaxValue,
337+
currentTimeMillis = now
331338
)
332-
if (probe2.size == 2) {
333-
val (_, pruned1) = connectedPeers.prunePeers(
334-
incoming = true,
335-
minAge,
336-
numPeers = 1
337-
)
338-
val (probe0, _) = pruned1.prunePeers(
339-
incoming = true,
340-
minAge,
341-
numPeers = 1
342-
)
343-
probe0 shouldBe empty
344-
345-
val (probe1, _) = pruned1.prunePeers(
346-
incoming = true,
347-
minAge,
348-
numPeers = 1,
349-
currentTimeMillis = System.currentTimeMillis + minAge.toMillis
350-
)
351-
probe1 should not be empty
339+
whenever(probe.size >= 2) {
340+
val (_, pruned1) = connectedPeers.prunePeers(minAge, numPeers = 1, currentTimeMillis = now)
341+
342+
pruned1.prunePeers(minAge, numPeers = 1, currentTimeMillis = now + 1)._1 shouldBe empty
343+
344+
pruned1
345+
.prunePeers(
346+
minAge,
347+
numPeers = 1,
348+
currentTimeMillis = now + minAge.toMillis
349+
)
350+
._1 should not be empty
352351
}
353352
}
354353

355354
// Not prune the same peer repeatedly.
356355
{
357356
val (peers1, pruned) = connectedPeers.prunePeers(
358-
incoming = true,
359357
peerConfiguration.minPruneAge,
360358
numPeers = numPeersToPrune
361359
)
362360
val (peers2, _) = pruned.prunePeers(
363-
incoming = true,
364361
peerConfiguration.minPruneAge,
365-
numPeers = numPeersToPrune
362+
numPeers = numPeersToPrune,
363+
currentTimeMillis = now + peerConfiguration.minPruneAge.toMillis
366364
)
367365
peers1.toSet intersect peers2.toSet shouldBe empty
368366
}
367+
368+
// Prune peers with minimum priority first.
369+
{
370+
val (peers, _) = connectedPeers.prunePeers(
371+
peerConfiguration.minPruneAge,
372+
numPeers = numPeersToPrune,
373+
priority = _.hashCode.toDouble // Dummy priority
374+
)
375+
whenever(peers.nonEmpty) {
376+
Inspectors.forAll(peers.init zip peers.tail) { case (a, b) =>
377+
a.id.hashCode shouldBe <=(b.id.hashCode)
378+
}
379+
}
380+
}
369381
}
370382
}
371383

@@ -385,7 +397,6 @@ class PeerManagerSpec
385397

386398
// Not prune again until the peers have been disconnected.
387399
val (peers, pruning) = connectedPeers.prunePeers(
388-
incoming = true,
389400
peerConfiguration.minPruneAge,
390401
numPeersToPrune0
391402
)
@@ -395,22 +406,24 @@ class PeerManagerSpec
395406
val pruned = peers.foldLeft(pruning) { case (ps, p) =>
396407
ps.removeTerminatedPeer(p.ref)._2
397408
}
409+
// ignore should be at the minimum incoming peer count now.
398410
PeerManagerActor.numberOfIncomingConnectionsToPrune(pruned, peerConfiguration) shouldBe 0
399411

400412
val replenished = newIncoming.foldLeft(pruned) { case (ps, p) =>
401413
ps.addNewPendingPeer(p).promotePeerToHandshaked(p)
402414
}
415+
// ignore should be maxed out now, can prune again.
403416
PeerManagerActor.numberOfIncomingConnectionsToPrune(replenished, peerConfiguration) shouldBe >(0)
404417
}
405418
}
406419

407420
trait ConnectedPeersFixture {
408421
case class TestConfig(
409-
minOutgoingPeers: Int = 20,
422+
minOutgoingPeers: Int = 10,
410423
maxOutgoingPeers: Int = 30,
411424
maxIncomingPeers: Int = 30,
412425
maxPendingPeers: Int = 20,
413-
pruneIncomingPeers: Int = 10,
426+
pruneIncomingPeers: Int = 20,
414427
minPruneAge: FiniteDuration = 30.minutes
415428
) extends PeerManagerActor.PeerConfiguration.ConnectionLimits
416429

@@ -419,6 +432,9 @@ class PeerManagerSpec
419432
implicit val arbConnectedPeers: Arbitrary[ConnectedPeers] = Arbitrary {
420433
genConnectedPeers(peerConfiguration.maxIncomingPeers, peerConfiguration.maxOutgoingPeers)
421434
}
435+
436+
implicit val noShrinkConnectedPeers: Shrink[ConnectedPeers] =
437+
Shrink[ConnectedPeers](_ => Stream.empty)
422438
}
423439

424440
trait TestSetup {
@@ -508,7 +524,7 @@ class PeerManagerSpec
508524
ip <- Gen.listOfN(4, Gen.choose(0, 255)).map(_.mkString("."))
509525
port <- Gen.choose(10000, 60000)
510526
incoming <- arbitrary[Boolean]
511-
ageMillis <- Gen.choose(0, 60 * 60 * 1000)
527+
ageMillis <- Gen.choose(0, 24 * 60 * 60 * 1000)
512528
} yield Peer(
513529
remoteAddress = new InetSocketAddress(ip, port),
514530
ref = TestProbe().ref,
@@ -531,7 +547,9 @@ class PeerManagerSpec
531547
incoming <- Gen.listOfN(numIncoming, genIncomingPeer)
532548
outgoing <- Gen.listOfN(numOutgoing, genOugoingPeer)
533549
connections0 = (incoming ++ outgoing).foldLeft(ConnectedPeers.empty)(_ addNewPendingPeer _)
534-
handshaked <- Gen.someOf(incoming ++ outgoing)
550+
ratioHandshaked <- Gen.choose(0.75, 1.0)
551+
numHandshaked <- Gen.choose(0.75, 1.0).map(_ * (numIncoming + numOutgoing)).map(_.toInt)
552+
handshaked <- Gen.pick(numHandshaked, incoming ++ outgoing)
535553
connections1 = handshaked.foldLeft(connections0)(_ promotePeerToHandshaked _)
536554
} yield connections1
537555

0 commit comments

Comments
 (0)