Skip to content

ETCM-393: Lookup on demand #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
13b75d3
ETCM-393: Update Scalanet version.
aakoshh Nov 19, 2020
17beac9
ETCM-393: Made methods private. Moved stash to where it makes sense.
aakoshh Nov 19, 2020
d206036
ETCM-393: Ask for replacement random nodes when a connection is termi…
aakoshh Nov 19, 2020
50e3356
ETCM-393: Filter random nodes for non-local.
aakoshh Nov 19, 2020
c3cf2b7
ETCM-393: Update nix expressions after Scalanet version change.
aakoshh Nov 20, 2020
84d4e26
ETCM-393: Fix PeerManagerActor tests. Test that it asks for a random …
aakoshh Nov 20, 2020
aa1bead
ETCM-393: Unit test the random node logic in PeerDiscoveryManager.
aakoshh Nov 20, 2020
62e8608
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Nov 20, 2020
c4b09cd
ETCM-393: Remove leftover stash trigger message.
aakoshh Nov 20, 2020
3d7ff09
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Nov 20, 2020
ab8e7dd
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Nov 20, 2020
a50e37e
ETCM-393: Enroll with known peers as well as bootstraps.
aakoshh Nov 23, 2020
7fa8e44
ETCM-393: Reduce timeout so lookups don't take that long.
aakoshh Nov 23, 2020
12b2672
ETCM-393: Trigger 1 random lookup if there's unsatisfied demand.
aakoshh Nov 23, 2020
72803f1
ETCM-393: 2 messages should be enough for all neighbors to arrive.
aakoshh Nov 23, 2020
9cc2609
ETCM-393: Fix PeerManagerActor unit test: can ask for random node first.
aakoshh Nov 23, 2020
f671cc4
ETCM-393: Use random bytes for lookup.
aakoshh Nov 23, 2020
d5a8301
ETCM-393: Add comments about the Iterant consumer setup.
aakoshh Nov 24, 2020
61d8d5b
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Nov 24, 2020
14b4045
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Nov 26, 2020
e9ba46d
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Nov 27, 2020
47918d3
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Dec 2, 2020
c500fe2
ETCM-393: Change in/out connection limits from 15/45 to 30/30.
aakoshh Dec 2, 2020
fd83f09
Merge remote-tracking branch 'origin/develop' into ETCM-393-lookup-on…
aakoshh Dec 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Dependencies {
)

val network: Seq[ModuleID] = {
val scalanetVersion = "0.4.2-SNAPSHOT"
val scalanetVersion = "0.4.3-SNAPSHOT"
Seq(
"io.iohk" %% "scalanet" % scalanetVersion,
"io.iohk" %% "scalanet-discovery" % scalanetVersion
Expand Down
48 changes: 24 additions & 24 deletions repo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,37 @@
"nix-public" = "";
};
"artifacts" = {
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-javadoc.jar";
sha256 = "190A1AB2C6EBEBDAE6E8729005018C6524C79E87447F09F0EC26DA7005D27AE2";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT-javadoc.jar";
sha256 = "4AE14367AB90A6D14E96A31C10A7AC9F6E4C73E24BEE77C2547388D01C450B75";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-sources.jar";
sha256 = "BE91870FA3F3F1B4D37344563B33CB06E6451788CACF2CB7BAA77C0108D8E2E5";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT-sources.jar";
sha256 = "4594C40ADE4F997D6146B63AE2D2A9F04F7ECFFA353CF7CE5CBA466B56539058";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.jar";
sha256 = "2FCBBF064CC95DA4328FB259F4424C1B745826A308249169EE4343C0C962CE94";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT.jar";
sha256 = "33A254B5BC97B74A640AEB47ECA2C1BCDF4E751832DEB05AAFFF291E3DCD1F47";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.pom";
sha256 = "46B60B737421B7E5E4CEB6412DCAAD80221EDC3ACD7E2227CDA01F1A9F57E18E";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.3-SNAPSHOT/scalanet-discovery_2.12-0.4.3-SNAPSHOT.pom";
sha256 = "D2033E10E494A26D22499EE410175CAC53D0067B039C155A92CF7F74D0AEA5EA";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-javadoc.jar";
sha256 = "84D56180DC60D6F4F6911D4507622E91DE4E2C96582F97784135DBFDECB0B12B";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT-javadoc.jar";
sha256 = "95C856E3B8F53AEE7C184D5300E9755CA871F6C030238075EFDE2F10C8AA7D71";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-sources.jar";
sha256 = "07553109F5461D45AC1B048C134132E6CBB32EC838300093E550B8CFE62C434B";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT-sources.jar";
sha256 = "3B2112DCD566F433D703B18ADCF34D03C0A841FFE627D79C4005285151B9E489";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.jar";
sha256 = "7DF884B172D973459BB7AF6553014B984FE87EC788DD35AEB27F0DEDDD4B215E";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT.jar";
sha256 = "84FF9380CBA521F1C3B2C09F1E91934C83D03A72B54BE22814D8534EE4660B88";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.pom";
sha256 = "A27D2641B97A06FDA1B2C81FA96FA1ECA1E563ACA36CEEF3C8C6070C0F1CF731";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.3-SNAPSHOT/scalanet_2.12-0.4.3-SNAPSHOT.pom";
sha256 = "C403E63E23785D7E09B0A9F4E0AB5544607A08D9F9596157F8A0ACC14B248344";
};
"nix-public/ch/megard/akka-http-cors_2.12/1.1.0/akka-http-cors_2.12-1.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/ch/megard/akka-http-cors_2.12/1.1.0/akka-http-cors_2.12-1.1.0-javadoc.jar";
Expand Down
10 changes: 5 additions & 5 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ mantis {
kademlia-bucket-size = 16

# Timeout for individual requests like Ping.
request-timeout = 3.seconds
request-timeout = 1.seconds

# Timeout to collect all possible responses for a FindNode request.
kademlia-timeout = 7.seconds
kademlia-timeout = 2.seconds

# Level of concurrency during lookups and enrollment.
kademlia-alpha = 3
Expand Down Expand Up @@ -143,10 +143,10 @@ mantis {
max-mpt-components-per-message = 200

# Maximum number of peers this node can connect to
max-outgoing-peers = 45
max-outgoing-peers = 30

# Maximum number of peers that can connect to this node
max-incoming-peers = 15
max-incoming-peers = 30

# Maximum number of peers that can be connecting to this node
max-pending-peers = 20
Expand All @@ -155,7 +155,7 @@ mantis {
update-nodes-initial-delay = 5.seconds

# Newly discovered nodes connect attempt interval
update-nodes-interval = 10.seconds
update-nodes-interval = 30.seconds

# Peer which disconnect during tcp connection becouse of too many peers will not be retried for this short duration
short-blacklist-duration = 6.minutes
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ case class ConnectedPeers(
handshakedPeersNodeIds.contains(nodeId)

lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size
lazy val outgoingPendingPeersCount: Int = outgoingPendingPeers.size
lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection }
lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection }

lazy val handshakedPeersCount: Int = handshakedPeers.size
lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeers.size
lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeersCount

def getPeer(peerId: PeerId): Option[Peer] = peers.get(peerId)

Expand Down
127 changes: 86 additions & 41 deletions src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.iohk.ethereum.network.PeerActor.PeerClosedConnection
import io.iohk.ethereum.network.PeerActor.Status.Handshaked
import io.iohk.ethereum.network.PeerEventBusActor._
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager}
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager, Node}
import io.iohk.ethereum.network.handshaker.Handshaker
import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
import io.iohk.ethereum.network.p2p.Message.Version
Expand Down Expand Up @@ -54,6 +54,20 @@ class PeerManagerActor(

private type PeerMap = Map[PeerId, Peer]

implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) {
def outgoingConnectionDemand: Int =
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount - connectedPeers.outgoingPendingPeersCount but I saw negative values, so I kept this as it was the original logic anyway.


def canConnectTo(node: Node): Boolean = {
val socketAddress = node.tcpSocketAddress
val alreadyConnected =
connectedPeers.isConnectionHandled(socketAddress) ||
connectedPeers.hasHandshakedWith(node.id)

!alreadyConnected && !isBlacklisted(PeerAddress(socketAddress.getHostString))
}
}

// Subscribe to the handshake event of any peer
peerEventBus ! Subscribe(SubscriptionClassifier.PeerHandshaked)

Expand All @@ -64,11 +78,14 @@ class PeerManagerActor(
Stop
}

override def receive: Receive = { case StartConnecting =>
scheduleNodesUpdate()
knownNodesManager ! KnownNodesManager.GetKnownNodes
context become listening(ConnectedPeers.empty)
unstashAll()
override def receive: Receive = {
case StartConnecting =>
scheduleNodesUpdate()
knownNodesManager ! KnownNodesManager.GetKnownNodes
context become listening(ConnectedPeers.empty)
unstashAll()
case _ =>
stash()
}

private def scheduleNodesUpdate(): Unit = {
Expand All @@ -80,16 +97,14 @@ class PeerManagerActor(
)
}

def listening(connectedPeers: ConnectedPeers): Receive = {
private def listening(connectedPeers: ConnectedPeers): Receive = {
handleCommonMessages(connectedPeers) orElse
handleBlacklistMessages orElse
connections(connectedPeers) orElse
handleNewNodesToConnectMessages(connectedPeers) orElse { case _ =>
stash()
}
handleConnections(connectedPeers) orElse
handleNewNodesToConnectMessages(connectedPeers)
}

def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = {
private def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = {
case KnownNodesManager.KnownNodes(nodes) =>
val nodesToConnect = nodes.take(peerConfiguration.maxOutgoingPeers)

Expand All @@ -100,37 +115,61 @@ class PeerManagerActor(
log.debug("The known nodes list is empty")
}

case PeerDiscoveryManager.RandomNodeInfo(node) =>
maybeConnectToRandomNode(connectedPeers, node)

case PeerDiscoveryManager.DiscoveredNodesInfo(nodes) =>
val nodesToConnect = nodes
.filterNot { node =>
val socketAddress = node.tcpSocketAddress
val alreadyConnected =
connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith(node.id)
alreadyConnected || isBlacklisted(PeerAddress(socketAddress.getHostString))
} // not already connected to or blacklisted
.take(peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount)

NetworkMetrics.DiscoveredPeersSize.set(nodes.size)
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size)
NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount)

log.info(
s"Discovered ${nodes.size} nodes, " +
s"Blacklisted ${blacklistedPeers.size} nodes, " +
s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " +
s"pending connection attempts ${connectedPeers.pendingPeersCount}. " +
s"Trying to connect to ${nodesToConnect.size} more nodes."
)
maybeConnectToDiscoveredNodes(connectedPeers, nodes)
}

if (nodesToConnect.nonEmpty) {
log.debug("Trying to connect to {} nodes", nodesToConnect.size)
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri))
private def maybeConnectToRandomNode(connectedPeers: ConnectedPeers, node: Node): Unit = {
if (connectedPeers.outgoingConnectionDemand > 0) {
if (connectedPeers.canConnectTo(node)) {
log.debug(s"Trying to connect to random node at ${node.addr}")
self ! ConnectToPeer(node.toUri)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up-to-discussion (and also for me understanding better the PR): Why aren't we requesting another random node in case with the node received we didn't get enough (that is: connectedPeers.outgoingConnectionDemand > 0 is still true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may be misunderstanding your question, I think we are doing exactly that:

   if (outgoingConnectionDemand > 0) {
      if (canConnectTo(node)) {
        // connect
      } else {
        // ask for another
        // Isn't this what you are asking? Demand is > 0 but we can't connect, so we ask for a replacement.
      }
    } else {
      // ignore because outgoingConnectionDemand <= 0
    }

} else {
log.debug("The nodes list is empty, no new nodes to connect to")
log.debug("Asking for another random node")
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo
}
} else {
log.debug("Ignoring random node; no demand at the moment.")
}
}

def connections(connectedPeers: ConnectedPeers): Receive = {
private def maybeConnectToDiscoveredNodes(connectedPeers: ConnectedPeers, nodes: Set[Node]): Unit = {
val nodesToConnect = nodes
.filter(connectedPeers.canConnectTo)
.take(connectedPeers.outgoingConnectionDemand)

NetworkMetrics.DiscoveredPeersSize.set(nodes.size)
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size)
NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount)

log.info(
s"Discovered ${nodes.size} nodes, " +
s"Blacklisted ${blacklistedPeers.size} nodes, " +
s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " +
s"pending connection attempts ${connectedPeers.pendingPeersCount}. " +
s"Trying to connect to ${nodesToConnect.size} more nodes."
)

if (nodesToConnect.nonEmpty) {
log.debug("Trying to connect to {} nodes", nodesToConnect.size)
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri))
} else {
log.debug("The nodes list is empty, no new nodes to connect to")
}

// Make sure the background lookups keep going and we don't get stuck with 0
// nodes to connect to until the next discovery scan loop. Only sending 1
// request so we don't rack up too many pending futures, just trigger a a
// search if needed.
if (connectedPeers.outgoingConnectionDemand > nodesToConnect.size) {
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo
}
}

private def handleConnections(connectedPeers: ConnectedPeers): Receive = {
case PeerClosedConnection(peerAddress, reason) =>
blacklist(
PeerAddress(peerAddress),
Expand All @@ -145,7 +184,7 @@ class PeerManagerActor(
connectWith(uri, connectedPeers)
}

def getBlacklistDuration(reason: Long): FiniteDuration = {
private def getBlacklistDuration(reason: Long): FiniteDuration = {
import Disconnect.Reasons._
reason match {
case TooManyPeers => peerConfiguration.shortBlacklistDuration
Expand Down Expand Up @@ -179,7 +218,9 @@ class PeerManagerActor(
val (peer, newConnectedPeers) = createPeer(address, incomingConnection = true, connectedPeers)
peer.ref ! PeerActor.HandleConnection(connection, remoteAddress)
context become listening(newConnectedPeers)
case Left(error) => handleConnectionErrors(error)

case Left(error) =>
handleConnectionErrors(error)
}
}

Expand All @@ -206,7 +247,7 @@ class PeerManagerActor(
}
}

def handleCommonMessages(connectedPeers: ConnectedPeers): Receive = {
private def handleCommonMessages(connectedPeers: ConnectedPeers): Receive = {
case GetPeers =>
getPeers(connectedPeers.peers.values.toSet).pipeTo(sender())

Expand All @@ -218,7 +259,11 @@ class PeerManagerActor(
terminatedPeersIds.foreach { peerId =>
peerEventBus ! Publish(PeerEvent.PeerDisconnected(peerId))
}

// Try to replace a lost connection with another one.
log.debug(s"Demand after terminated connection: ${newConnectedPeers.outgoingConnectionDemand}")
if (newConnectedPeers.outgoingConnectionDemand > 0) {
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo
}
context unwatch ref
context become listening(newConnectedPeers)

Expand Down
Loading