1
1
package io .iohk .ethereum .network
2
2
3
- import java .net .{InetSocketAddress , URI }
4
3
import akka .actor .SupervisorStrategy .Stop
5
4
import akka .actor ._
6
5
import akka .util .{ByteString , Timeout }
@@ -22,7 +21,12 @@ import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
22
21
import monix .eval .Task
23
22
import monix .execution .{Scheduler => MonixScheduler }
24
23
import org .bouncycastle .util .encoders .Hex
24
+
25
+ import java .net .{InetSocketAddress , URI }
26
+ import java .util .Collections .newSetFromMap
27
+ import scala .collection .mutable
25
28
import scala .concurrent .duration ._
29
+ import scala .jdk .CollectionConverters ._
26
30
27
31
class PeerManagerActor (
28
32
peerEventBus : ActorRef ,
@@ -51,6 +55,8 @@ class PeerManagerActor(
51
55
import PeerManagerActor ._
52
56
import akka .pattern .pipe
53
57
58
+ val triedNodes : mutable.Set [ByteString ] = lruSet[ByteString ](maxBlacklistedNodes)
59
+
54
60
implicit class ConnectedPeersOps (connectedPeers : ConnectedPeers ) {
55
61
56
62
/** Number of new connections the node should try to open at any given time. */
@@ -126,6 +132,7 @@ class PeerManagerActor(
126
132
private def maybeConnectToRandomNode (connectedPeers : ConnectedPeers , node : Node ): Unit = {
127
133
if (connectedPeers.outgoingConnectionDemand > 0 ) {
128
134
if (connectedPeers.canConnectTo(node)) {
135
+ triedNodes.add(node.id)
129
136
self ! ConnectToPeer (node.toUri)
130
137
} else {
131
138
peerDiscoveryManager ! PeerDiscoveryManager .GetRandomNodeInfo
@@ -134,9 +141,15 @@ class PeerManagerActor(
134
141
}
135
142
136
143
private def maybeConnectToDiscoveredNodes (connectedPeers : ConnectedPeers , nodes : Set [Node ]): Unit = {
137
- val nodesToConnect = nodes
144
+ val discoveredNodes = nodes
138
145
.filter(connectedPeers.canConnectTo)
139
- .take(connectedPeers.outgoingConnectionDemand)
146
+
147
+ val nodesToConnect = discoveredNodes
148
+ .filterNot(n => triedNodes.contains(n.id)) match {
149
+ case seq if seq.size >= connectedPeers.outgoingConnectionDemand =>
150
+ seq.take(connectedPeers.outgoingConnectionDemand)
151
+ case _ => discoveredNodes.take(connectedPeers.outgoingConnectionDemand)
152
+ }
140
153
141
154
NetworkMetrics .DiscoveredPeersSize .set(nodes.size)
142
155
NetworkMetrics .BlacklistedPeersSize .set(blacklistedPeers.size)
@@ -152,14 +165,17 @@ class PeerManagerActor(
152
165
153
166
if (nodesToConnect.nonEmpty) {
154
167
log.debug(" Trying to connect to {} nodes" , nodesToConnect.size)
155
- nodesToConnect.foreach(n => self ! ConnectToPeer (n.toUri))
168
+ nodesToConnect.foreach(n => {
169
+ triedNodes.add(n.id)
170
+ self ! ConnectToPeer (n.toUri)
171
+ })
156
172
} else {
157
173
log.debug(" The nodes list is empty, no new nodes to connect to" )
158
174
}
159
175
160
176
// Make sure the background lookups keep going and we don't get stuck with 0
161
177
// nodes to connect to until the next discovery scan loop. Only sending 1
162
- // request so we don't rack up too many pending futures, just trigger a a
178
+ // request so we don't rack up too many pending futures, just trigger a
163
179
// search if needed.
164
180
if (connectedPeers.outgoingConnectionDemand > nodesToConnect.size) {
165
181
peerDiscoveryManager ! PeerDiscoveryManager .GetRandomNodeInfo
@@ -184,7 +200,7 @@ class PeerManagerActor(
184
200
private def getBlacklistDuration (reason : Long ): FiniteDuration = {
185
201
import Disconnect .Reasons ._
186
202
reason match {
187
- case TooManyPeers => peerConfiguration.shortBlacklistDuration
203
+ case TooManyPeers | AlreadyConnected | ClientQuitting => peerConfiguration.shortBlacklistDuration
188
204
case _ => peerConfiguration.longBlacklistDuration
189
205
}
190
206
}
@@ -552,4 +568,9 @@ object PeerManagerActor {
552
568
}
553
569
.getOrElse(0.0 )
554
570
}
571
+
572
+ def lruSet [A ](maxEntries : Int ): mutable.Set [A ] =
573
+ newSetFromMap[A ](new java.util.LinkedHashMap [A , java.lang.Boolean ]() {
574
+ override def removeEldestEntry (eldest : java.util.Map .Entry [A , java.lang.Boolean ]): Boolean = size > maxEntries
575
+ }).asScala
555
576
}
0 commit comments