@@ -54,8 +54,10 @@ class PeerManagerActor(
54
54
import akka .pattern .{ask , pipe }
55
55
56
56
implicit class ConnectedPeersOps (connectedPeers : ConnectedPeers ) {
57
+
58
+ /** Number of new connections the node should try to open at any given time. */
57
59
def outgoingConnectionDemand : Int =
58
- peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
60
+ PeerManagerActor .outgoingConnectionDemand( connectedPeers, peerConfiguration)
59
61
60
62
def canConnectTo (node : Node ): Boolean = {
61
63
val socketAddress = node.tcpSocketAddress
@@ -100,7 +102,8 @@ class PeerManagerActor(
100
102
handleCommonMessages(connectedPeers) orElse
101
103
handleBlacklistMessages orElse
102
104
handleConnections(connectedPeers) orElse
103
- handleNewNodesToConnectMessages(connectedPeers)
105
+ handleNewNodesToConnectMessages(connectedPeers) orElse
106
+ handlePruning(connectedPeers)
104
107
}
105
108
106
109
private def handleNewNodesToConnectMessages (connectedPeers : ConnectedPeers ): Receive = {
@@ -266,6 +269,12 @@ class PeerManagerActor(
266
269
handshakedPeer.incomingConnection && connectedPeers.incomingHandshakedPeersCount >= peerConfiguration.maxIncomingPeers
267
270
) {
268
271
handshakedPeer.ref ! PeerActor .DisconnectPeer (Disconnect .Reasons .TooManyPeers )
272
+
273
+ // It looks like all incoming slots are taken; try to make some room.
274
+ self ! SchedulePruneIncomingPeers
275
+
276
+ context become listening(connectedPeers)
277
+
269
278
} else if (handshakedPeer.nodeId.exists(connectedPeers.hasHandshakedWith)) {
270
279
// FIXME: peers received after handshake should always have their nodeId defined, we could maybe later distinguish
271
280
// it into PendingPeer/HandshakedPeer classes
@@ -286,13 +295,54 @@ class PeerManagerActor(
286
295
): (Peer , ConnectedPeers ) = {
287
296
val ref = peerFactory(context, address, incomingConnection)
288
297
context watch ref
289
- val pendingPeer = Peer (address, ref, incomingConnection, None )
298
+ val pendingPeer = Peer (address, ref, incomingConnection, None , createTimeMillis = System .currentTimeMillis )
290
299
291
300
val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer)
292
301
293
302
(pendingPeer, newConnectedPeers)
294
303
}
295
304
305
+ private def handlePruning (connectedPeers : ConnectedPeers ): Receive = {
306
+ case SchedulePruneIncomingPeers =>
307
+ implicit val timeout : Timeout = Timeout (peerConfiguration.updateNodesInterval)
308
+
309
+ // Ask for the whole statistics duration, we'll use averages to make it fair.
310
+ val window = peerConfiguration.statSlotCount * peerConfiguration.statSlotDuration
311
+
312
+ (peerStatistics ? PeerStatisticsActor .GetStatsForAll (window))
313
+ .mapTo[PeerStatisticsActor .StatsForAll ]
314
+ .map(PruneIncomingPeers (_))
315
+ .pipeTo(self)
316
+
317
+ case PruneIncomingPeers (PeerStatisticsActor .StatsForAll (stats)) =>
318
+ val prunedConnectedPeers = pruneIncomingPeers(connectedPeers, stats)
319
+
320
+ context become listening(prunedConnectedPeers)
321
+ }
322
+
323
+ /** Disconnect some incoming connections so we can free up slots. */
324
+ private def pruneIncomingPeers (
325
+ connectedPeers : ConnectedPeers ,
326
+ stats : Map [PeerId , PeerStat ]
327
+ ): ConnectedPeers = {
328
+ val pruneCount = PeerManagerActor .numberOfIncomingConnectionsToPrune(connectedPeers, peerConfiguration)
329
+ val now = System .currentTimeMillis
330
+ val (peersToPrune, prunedConnectedPeers) =
331
+ connectedPeers.prunePeers(
332
+ incoming = true ,
333
+ minAge = peerConfiguration.minPruneAge,
334
+ numPeers = pruneCount,
335
+ priority = prunePriority(stats, now),
336
+ currentTimeMillis = now
337
+ )
338
+
339
+ peersToPrune.foreach { peer =>
340
+ peer.ref ! PeerActor .DisconnectPeer (Disconnect .Reasons .TooManyPeers )
341
+ }
342
+
343
+ prunedConnectedPeers
344
+ }
345
+
296
346
private def getPeers (peers : Set [Peer ]): Future [Peers ] = {
297
347
implicit val timeout : Timeout = Timeout (2 .seconds)
298
348
@@ -394,7 +444,7 @@ object PeerManagerActor {
394
444
ctx.actorOf(props, id)
395
445
}
396
446
397
- trait PeerConfiguration {
447
+ trait PeerConfiguration extends PeerConfiguration . ConnectionLimits {
398
448
val connectRetryDelay : FiniteDuration
399
449
val connectMaxRetries : Int
400
450
val disconnectPoisonPillTimeout : FiniteDuration
@@ -403,9 +453,6 @@ object PeerManagerActor {
403
453
val waitForChainCheckTimeout : FiniteDuration
404
454
val fastSyncHostConfiguration : FastSyncHostConfiguration
405
455
val rlpxConfiguration : RLPxConfiguration
406
- val maxOutgoingPeers : Int
407
- val maxIncomingPeers : Int
408
- val maxPendingPeers : Int
409
456
val networkId : Int
410
457
val updateNodesInitialDelay : FiniteDuration
411
458
val updateNodesInterval : FiniteDuration
@@ -414,6 +461,16 @@ object PeerManagerActor {
414
461
val statSlotDuration : FiniteDuration
415
462
val statSlotCount : Int
416
463
}
464
+ object PeerConfiguration {
465
+ trait ConnectionLimits {
466
+ val minOutgoingPeers : Int
467
+ val maxOutgoingPeers : Int
468
+ val maxIncomingPeers : Int
469
+ val maxPendingPeers : Int
470
+ val pruneIncomingPeers : Int
471
+ val minPruneAge : FiniteDuration
472
+ }
473
+ }
417
474
418
475
trait FastSyncHostConfiguration {
419
476
val maxBlocksHeadersPerMessage : Int
@@ -447,4 +504,50 @@ object PeerManagerActor {
447
504
case class OutgoingConnectionAlreadyHandled (uri : URI ) extends ConnectionError
448
505
449
506
case class PeerAddress (value : String ) extends BlackListId
507
+
508
+ case object SchedulePruneIncomingPeers
509
+ case class PruneIncomingPeers (stats : PeerStatisticsActor .StatsForAll )
510
+
511
+ /** Number of new connections the node should try to open at any given time. */
512
+ def outgoingConnectionDemand (
513
+ connectedPeers : ConnectedPeers ,
514
+ peerConfiguration : PeerConfiguration .ConnectionLimits
515
+ ): Int = {
516
+ if (connectedPeers.outgoingHandshakedPeersCount >= peerConfiguration.minOutgoingPeers)
517
+ // We have established at least the minimum number of working connections.
518
+ 0
519
+ else
520
+ // Try to connect to more, up to the maximum, including pending peers.
521
+ peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
522
+ }
523
+
524
+ def numberOfIncomingConnectionsToPrune (
525
+ connectedPeers : ConnectedPeers ,
526
+ peerConfiguration : PeerConfiguration .ConnectionLimits
527
+ ): Int = {
528
+ val minIncomingPeers = peerConfiguration.maxIncomingPeers - peerConfiguration.pruneIncomingPeers
529
+ math.max(
530
+ 0 ,
531
+ connectedPeers.incomingHandshakedPeersCount - connectedPeers.incomingPruningPeersCount - minIncomingPeers
532
+ )
533
+ }
534
+
535
+ /** Assign a priority to peers that we can use to order connections,
536
+ * with lower priorities being the ones to prune first.
537
+ */
538
+ def prunePriority (stats : Map [PeerId , PeerStat ], currentTimeMillis : Long )(peerId : PeerId ): Double = {
539
+ stats
540
+ .get(peerId)
541
+ .flatMap { stat =>
542
+ val maybeAgeSeconds = stat.firstSeenTimeMillis
543
+ .map(currentTimeMillis - _)
544
+ .map(_ * 1000 )
545
+ .filter(_ > 0 )
546
+
547
+ // Use the average number of responses per second over the lifetime of the connection
548
+ // as an indicator of how fruitful the peer is for us.
549
+ maybeAgeSeconds.map(age => stat.responsesReceived.toDouble / age)
550
+ }
551
+ .getOrElse(0.0 )
552
+ }
450
553
}
0 commit comments