-
Notifications
You must be signed in to change notification settings - Fork 75
ETCM-393: Lookup on demand #803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
13b75d3
17beac9
d206036
50e3356
c3cf2b7
84d4e26
aa1bead
62e8608
c4b09cd
3d7ff09
ab8e7dd
a50e37e
7fa8e44
12b2672
72803f1
9cc2609
f671cc4
d5a8301
61d8d5b
14b4045
e9ba46d
47918d3
c500fe2
fd83f09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,7 @@ import io.iohk.ethereum.network.PeerActor.PeerClosedConnection | |
import io.iohk.ethereum.network.PeerActor.Status.Handshaked | ||
import io.iohk.ethereum.network.PeerEventBusActor._ | ||
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration | ||
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager} | ||
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager, Node} | ||
import io.iohk.ethereum.network.handshaker.Handshaker | ||
import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult | ||
import io.iohk.ethereum.network.p2p.Message.Version | ||
|
@@ -54,6 +54,20 @@ class PeerManagerActor( | |
|
||
private type PeerMap = Map[PeerId, Peer] | ||
|
||
implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) { | ||
def outgoingConnectionDemand: Int = | ||
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount | ||
|
||
def canConnectTo(node: Node): Boolean = { | ||
val socketAddress = node.tcpSocketAddress | ||
val alreadyConnected = | ||
connectedPeers.isConnectionHandled(socketAddress) || | ||
connectedPeers.hasHandshakedWith(node.id) | ||
|
||
!alreadyConnected && !isBlacklisted(PeerAddress(socketAddress.getHostString)) | ||
} | ||
} | ||
|
||
// Subscribe to the handshake event of any peer | ||
peerEventBus ! Subscribe(SubscriptionClassifier.PeerHandshaked) | ||
|
||
|
@@ -64,11 +78,14 @@ class PeerManagerActor( | |
Stop | ||
} | ||
|
||
override def receive: Receive = { case StartConnecting => | ||
scheduleNodesUpdate() | ||
knownNodesManager ! KnownNodesManager.GetKnownNodes | ||
context become listening(ConnectedPeers.empty) | ||
unstashAll() | ||
override def receive: Receive = { | ||
case StartConnecting => | ||
scheduleNodesUpdate() | ||
knownNodesManager ! KnownNodesManager.GetKnownNodes | ||
context become listening(ConnectedPeers.empty) | ||
unstashAll() | ||
case _ => | ||
stash() | ||
} | ||
|
||
private def scheduleNodesUpdate(): Unit = { | ||
|
@@ -80,16 +97,14 @@ class PeerManagerActor( | |
) | ||
} | ||
|
||
def listening(connectedPeers: ConnectedPeers): Receive = { | ||
private def listening(connectedPeers: ConnectedPeers): Receive = { | ||
handleCommonMessages(connectedPeers) orElse | ||
handleBlacklistMessages orElse | ||
connections(connectedPeers) orElse | ||
handleNewNodesToConnectMessages(connectedPeers) orElse { case _ => | ||
stash() | ||
} | ||
handleConnections(connectedPeers) orElse | ||
handleNewNodesToConnectMessages(connectedPeers) | ||
} | ||
|
||
def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = { | ||
private def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = { | ||
case KnownNodesManager.KnownNodes(nodes) => | ||
val nodesToConnect = nodes.take(peerConfiguration.maxOutgoingPeers) | ||
|
||
|
@@ -100,37 +115,61 @@ class PeerManagerActor( | |
log.debug("The known nodes list is empty") | ||
} | ||
|
||
case PeerDiscoveryManager.RandomNodeInfo(node) => | ||
maybeConnectToRandomNode(connectedPeers, node) | ||
|
||
case PeerDiscoveryManager.DiscoveredNodesInfo(nodes) => | ||
val nodesToConnect = nodes | ||
.filterNot { node => | ||
val socketAddress = node.tcpSocketAddress | ||
val alreadyConnected = | ||
connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith(node.id) | ||
alreadyConnected || isBlacklisted(PeerAddress(socketAddress.getHostString)) | ||
} // not already connected to or blacklisted | ||
.take(peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount) | ||
|
||
NetworkMetrics.DiscoveredPeersSize.set(nodes.size) | ||
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size) | ||
NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount) | ||
|
||
log.info( | ||
s"Discovered ${nodes.size} nodes, " + | ||
s"Blacklisted ${blacklistedPeers.size} nodes, " + | ||
s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " + | ||
s"pending connection attempts ${connectedPeers.pendingPeersCount}. " + | ||
s"Trying to connect to ${nodesToConnect.size} more nodes." | ||
) | ||
maybeConnectToDiscoveredNodes(connectedPeers, nodes) | ||
} | ||
|
||
if (nodesToConnect.nonEmpty) { | ||
log.debug("Trying to connect to {} nodes", nodesToConnect.size) | ||
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri)) | ||
private def maybeConnectToRandomNode(connectedPeers: ConnectedPeers, node: Node): Unit = { | ||
if (connectedPeers.outgoingConnectionDemand > 0) { | ||
if (connectedPeers.canConnectTo(node)) { | ||
log.debug(s"Trying to connect to random node at ${node.addr}") | ||
self ! ConnectToPeer(node.toUri) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Up-to-discussion (and also for me understanding better the PR): Why aren't we requesting another random node in case with the node received we didn't get enough (that is: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I may be misunderstanding your question, I think we are doing exactly that: if (outgoingConnectionDemand > 0) {
if (canConnectTo(node)) {
// connect
} else {
// ask for another
// Isn't this what you are asking? Demand is > 0 but we can't connect, so we ask for a replacement.
}
} else {
// ignore because outgoingConnectionDemand <= 0
} |
||
} else { | ||
log.debug("The nodes list is empty, no new nodes to connect to") | ||
log.debug("Asking for another random node") | ||
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo | ||
} | ||
} else { | ||
log.debug("Ignoring random node; no demand at the moment.") | ||
} | ||
} | ||
|
||
def connections(connectedPeers: ConnectedPeers): Receive = { | ||
private def maybeConnectToDiscoveredNodes(connectedPeers: ConnectedPeers, nodes: Set[Node]): Unit = { | ||
val nodesToConnect = nodes | ||
.filter(connectedPeers.canConnectTo) | ||
.take(connectedPeers.outgoingConnectionDemand) | ||
|
||
NetworkMetrics.DiscoveredPeersSize.set(nodes.size) | ||
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size) | ||
NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount) | ||
|
||
log.info( | ||
s"Discovered ${nodes.size} nodes, " + | ||
s"Blacklisted ${blacklistedPeers.size} nodes, " + | ||
s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " + | ||
s"pending connection attempts ${connectedPeers.pendingPeersCount}. " + | ||
s"Trying to connect to ${nodesToConnect.size} more nodes." | ||
) | ||
|
||
if (nodesToConnect.nonEmpty) { | ||
log.debug("Trying to connect to {} nodes", nodesToConnect.size) | ||
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri)) | ||
} else { | ||
log.debug("The nodes list is empty, no new nodes to connect to") | ||
} | ||
|
||
// Make sure the background lookups keep going and we don't get stuck with 0 | ||
// nodes to connect to until the next discovery scan loop. Only sending 1 | ||
// request so we don't rack up too many pending futures, just trigger a a | ||
// search if needed. | ||
if (connectedPeers.outgoingConnectionDemand > nodesToConnect.size) { | ||
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo | ||
} | ||
} | ||
|
||
private def handleConnections(connectedPeers: ConnectedPeers): Receive = { | ||
case PeerClosedConnection(peerAddress, reason) => | ||
blacklist( | ||
PeerAddress(peerAddress), | ||
|
@@ -145,7 +184,7 @@ class PeerManagerActor( | |
connectWith(uri, connectedPeers) | ||
} | ||
|
||
def getBlacklistDuration(reason: Long): FiniteDuration = { | ||
private def getBlacklistDuration(reason: Long): FiniteDuration = { | ||
import Disconnect.Reasons._ | ||
reason match { | ||
case TooManyPeers => peerConfiguration.shortBlacklistDuration | ||
|
@@ -179,7 +218,9 @@ class PeerManagerActor( | |
val (peer, newConnectedPeers) = createPeer(address, incomingConnection = true, connectedPeers) | ||
peer.ref ! PeerActor.HandleConnection(connection, remoteAddress) | ||
context become listening(newConnectedPeers) | ||
case Left(error) => handleConnectionErrors(error) | ||
|
||
case Left(error) => | ||
handleConnectionErrors(error) | ||
} | ||
} | ||
|
||
|
@@ -206,7 +247,7 @@ class PeerManagerActor( | |
} | ||
} | ||
|
||
def handleCommonMessages(connectedPeers: ConnectedPeers): Receive = { | ||
private def handleCommonMessages(connectedPeers: ConnectedPeers): Receive = { | ||
case GetPeers => | ||
getPeers(connectedPeers.peers.values.toSet).pipeTo(sender()) | ||
|
||
|
@@ -218,7 +259,11 @@ class PeerManagerActor( | |
terminatedPeersIds.foreach { peerId => | ||
peerEventBus ! Publish(PeerEvent.PeerDisconnected(peerId)) | ||
} | ||
|
||
// Try to replace a lost connection with another one. | ||
log.debug(s"Demand after terminated connection: ${newConnectedPeers.outgoingConnectionDemand}") | ||
if (newConnectedPeers.outgoingConnectionDemand > 0) { | ||
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo | ||
} | ||
context unwatch ref | ||
context become listening(newConnectedPeers) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount - connectedPeers.outgoingPendingPeersCount
but I saw negative values, so I kept this as it was the original logic anyway.