Skip to content

ETCM-393: Lookup on demand #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Dec 2, 2020
Merged

ETCM-393: Lookup on demand #803

merged 24 commits into from
Dec 2, 2020

Conversation

aakoshh
Copy link
Contributor

@aakoshh aakoshh commented Nov 19, 2020

Description

The PR adds a mechanism to PeerManagerActor and PeerDiscoveryManager that should speed up the initial discovery of peers: whenever a peer disconnects from our node, a random replacement peer is seeked, fed from an infinite stream of random lookups. This works in tandem with the periodic attempt of connecting to a number of peers that Mantis does now.

Changed the default settings so we use lower request timeouts because with the defaults a single lookup round took ~30 seconds; with the new settings it's closer to 10 seconds. The node discovered 1000 nodes in 4 minutes (down from 10 minutes before), and 2000 nodes by the 10 minute mark.

The PR also changes the enrollment so that it re-connects to previously known nodes, not just the configured bootstraps. This is in addition to the PeerDiscoveryManager serving them as "already discovered", so the benefit we can get is that if they don't want to connect to us any more, we may discover replacement nodes quicker.

Proposed Solution

The problem with the new discovery mechanism is that it looks up nodes on a fixed schedule and the amount of nodes discovered and attempted to connect with in each round is limited, however most nodes on etc already have TooManyPeers and disconnect from us, and the PeerManagerActor only attempts another round every so often (it was 10 seconds so far). We can ramp up the discovery interval (a different timer) but Mantis only wants to connect to 45 nodes; once that's reached there's no need for rapidly firing lookup requests at peers.

With the PRs changes, the node performs random lookups on demand as long as it hasn't reached 45 outgoing connections. After that the more relaxed discovery interval of 15 minutes by default should be sufficient. It does this by asking for one random node to replace any dropped connection, which is served by doing a random lookup (which returns up to 16 nodes) and keeping the results buffered (up to 45 nodes in the buffer).

Alternatives

Another, simpler approach I considered was what trinity is doing, which is to add the number of required peers and the skip function to GetDiscoveredNodesInfo, letting the PeerDiscoveryManager sift through the nodes it has to try and find enough peers, returning however many it found, and kicking off a random lookup in the background if there isn't enough, unless one is running already.

The reason I though this would yield less peers than the one-for-one replacement strategy in the PR is that GetDiscoveredNodesInfo is only sent at a fixed schedule. If it takes much less time to discard candidates than the schedule interval, the PeerManagerActor would stay idle for longer times.

For example this could happen:

  1. We ask for 45 new peers. There's only 5, so we kick off a random lookup.
  2. None of the 5 can be connected to, so 10 seconds later we again ask for 45 peers. The random lookup is still running and has only found 3 additional nodes so far; we return those, but don't start a new lookup.
  3. The random lookup finishes 2 seconds later. Perhaps it found a few more new nodes, but we won't know for another 8 seconds when another round happens in the peer manager and it asks again.

Compared to this, the PR continuously runs background lookups as long as the random nodes it found are being consumed, which happens as soon as a node cannot be connected to.

Regarding leaving both this existing, periodic mechanism in PeerManagerActor and adding the new like-for-like replacement strategy on top, instead of finding a unified model: @KonradStaniec expressed some anxiety over the risks that come with changing the actor, so I thought this is the lightest touch. In theory we could rely solely on the random peer dispenser (ask for 45 in the beginning and keep replacing them) but that would be more difficult to prove correct. Since the existing way didn't care about occasional overflows, it looks like it should be able to handle the new random nodes as well.

Testing

Connected to etc:

./mantis-3.0/bin/mantis-launcher etc  -Dmantis.metrics.enabled=true

Then looking in the logs at ~/.mantis/etc/logs/mantis.log

2020-11-19 19:30:11,506 [io.iohk.ethereum.network.PeerActor] - Failed to establish RLPx connection
2020-11-19 19:30:11,506 [i.i.e.network.PeerManagerActor] - Blacklisting peer (PeerAddress(142.93.219.249)), peer disconnected due to: Some other reason specific to a subprotocol
2020-11-19 19:30:11,506 [akka.io.SelectionHandler] - Monitored actor [Actor[akka://mantis_system/user/peer-manager/142.93.219.249:30303/rlpx-connection#-1908314809]] terminated
2020-11-19 19:30:11,506 [i.i.e.network.PeerManagerActor] - Demand after terminated connection: 30
2020-11-19 19:30:11,595 [i.i.s.d.e.v.DiscoveryService$ServiceImpl] - Looking up a random target...
2020-11-19 19:30:11,595 [i.i.e.n.d.PeerDiscoveryManager] - Pulling random nodes on demand...
2020-11-19 19:30:11,595 [i.i.e.network.PeerManagerActor] - Asking for another random node
2020-11-19 19:30:11,595 [i.i.e.network.PeerManagerActor] - Trying to connect to random node at /40.70.31.40
2020-11-19 19:30:11,596 [i.i.e.network.PeerManagerActor] - Trying to connect to random node at /46.101.173.99
2020-11-19 19:30:11,596 [i.i.e.network.PeerManagerActor] - Trying to connect to random node at /134.209.79.212
2020-11-19 19:30:11,596 [i.i.e.network.PeerManagerActor] - Trying to connect to random node at /42.194.218.227
2020-11-19 19:30:11,597 [i.i.e.network.PeerManagerActor] - Asking for another random node
2020-11-19 19:30:11,598 [akka.io.TcpOutgoingConnection] - Attempting connection to [/40.70.31.40:30303]
2020-11-19 19:30:11,599 [akka.io.TcpOutgoingConnection] - Attempting connection to [/46.101.173.99:30300]

@@ -53,6 +53,20 @@ class PeerManagerActor(

private type PeerMap = Map[PeerId, Peer]

implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) {
def outgoingConnectionDemand: Int =
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount - connectedPeers.outgoingPendingPeersCount but I saw negative values, so I kept this as it was the original logic anyway.

@aakoshh aakoshh marked this pull request as ready for review November 20, 2020 14:31
sendDiscoveredNodesInfo(discovery.map(_._1), sender)

case GetRandomNodeInfo =>
sendRandomNodeInfo(discovery.map(_._2), sender)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not well versed in Iterant so I may be missing something, but doesn't such strategy means that we will have muliple concurrent lookups on going ? i.e

  1. PeerManager make connection to 10 peers
  2. All of those connections fails
  3. PeerDiscoveryManager receivess 10 GetRandomNodeInfo messages which means it starts 10 conncurent random lookupus ?

Copy link
Contributor Author

@aakoshh aakoshh Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this is not going to be happening is because those 10 requests are not doing 10 lookups, they are trying to pull 10 items from the concurrent consumer which is based on the iterant and has its own internal queue.

What's happening is that it sets up a consumer with a limited capacity, into which the iterant is constantly trying to push its items, but it pauses when the capacity is full. The pull which the requests kick off just tries to take 1 item from the underlying queue, which frees up space for the iterant to fill, which may result in another lookup happening via repeatEvalF.

The good thing about the the ConcurrentChannel used by Iterant.consumeWithConfig is that it can be consumed concurrently and it replicates messages to all consumers as well, which is what I ended up using in Scalanet ReqResponseChannel to replace the ConnectableSubject. But here we only have one consumer, so there's no replication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with a buffer size of 45 and a bucket size of 16 what will happen (or rather, what would happen if the lookups were infinitely fast) is that the iterant will do a lookup, get 16 peers, push them into the consumer, which will have 29 more spaces in the buffer, so the iterant will do 2 more lookups to fill those, and ending up blocked trying to push the last 3 items it has. The first 2 pulls will not result in further lookups, but after pulling the 3rd item the iterant will again do a lookup and be blocked trying to push the next 16 items into the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sounds neat! Could you maybe add this (or some shortened version) as a commment where Iterant is instantiated ?
Most of the people (including me) are pretty well versed with observable and task, and Iterant is somewhat less used part of monix, and it would be nice to have this expalantion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added comments. I liked the Iterant a lot, it's very similar to the Observable in its API, but constructing it recursively is much more intuitive, so things like lazy database iterators using cursors are easier to build.

randomNodes = Iterant
.repeatEvalF {
Task(log.debug("Pulling random nodes on demand...")) >>
service.lookupRandom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is someting i just noticed, that lookupRandom uses sigalg.newKeyPair._1 to generate random key for each lookup and our implementation Secp256k1SigAlg acutally generates real keys which can be pretty costly performance wise. Maybe for random lookups we should just generate random 64 bytes values ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that, although I don't think it's going to have any noticeable impact. On my machine generating 1000 new keys takes ~650ms. Generating 1000 random 64 byte arrays is just 4ms, but during my testing I only saw 1 lookup happening every ~10 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, although I preferred the simplicity of the original.

Copy link
Contributor

@KonradStaniec KonradStaniec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good ,i have also tested it on mainnet and was able to start sync with default settings in less than 10m so LGTM!

@aakoshh
Copy link
Contributor Author

aakoshh commented Nov 24, 2020

One caveat is that with the default setting of max-outgoing-connections = 45 and max-incoming-connections=15 in a large network we'll never stop running random lookups; it will keep performing lookups every 10 seconds forever, instead of relaxing to a 15 minute schedule.

That's because every (handshaked) outgoing connection is someone else's incoming connection; in a network of N nodes the total incoming count equals the total outgoing count. With nodes trying to have 3x as many outgoing connections as the incoming they allow, the incoming slots will eventually become saturated on every node, leaving an insatiable demand for ~30 extra outgoing connections (on average), forever triggering random lookups.

Copy link

@ntallar ntallar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some first comments!

if (connectedPeers.outgoingConnectionDemand > 0) {
if (connectedPeers.canConnectTo(node)) {
log.debug(s"Trying to connect to random node at ${node.addr}")
self ! ConnectToPeer(node.toUri)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up-to-discussion (and also for me understanding better the PR): Why aren't we requesting another random node in case with the node received we didn't get enough (that is: connectedPeers.outgoingConnectionDemand > 0 is still true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may be misunderstanding your question, I think we are doing exactly that:

   if (outgoingConnectionDemand > 0) {
      if (canConnectTo(node)) {
        // connect
      } else {
        // ask for another
        // Isn't this what you are asking? Demand is > 0 but we can't connect, so we ask for a replacement.
      }
    } else {
      // ignore because outgoingConnectionDemand <= 0
    }

knownPeers = (discoveryConfig.bootstrapNodes).map { node =>
// Since we're running the enrollment in the background, it won't hold up
// anything even if we have to enroll with hundreds of previously known nodes.
knownPeers = (discoveryConfig.bootstrapNodes ++ reusedKnownNodes).map { node =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd that we are repeating this same logic as on the PeerDiscoveryManager, do you think it's worth it or easy to remove this duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know what you mean, just didn't want to disturb the PeerDiscoveryManager again, seeing that it already had that mechanism from before, I thought some redundancy doesn't hurt. It's not exactly the same though: it returns these nodes from the beginning, whereas this one needs them to also respond to pings and ENR request first. Also the bootstrap nodes are always returned by the PeerDiscoveryManager, even if discovery is disabled - not so by the service, so if you had discovery disabled you'd really have nobody to connect to.

In theory there could be thousands of known nodes (in the eclipse paper there were more than 20,000 in the database), so it's conceivable that at some point someone would decide to undo this change.

But if you feel like this one is the only mechanism we should keep I'll remove the other.

@@ -35,16 +73,21 @@ class PeerDiscoveryManager(
else
knownNodesStorage.getKnownNodes().map(Node.fromUri)

bootstrapNodes ++ knownNodes
(bootstrapNodes ++ knownNodes).filterNot(isLocalNode).toVector
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: aren't we already filtering this before sending them on the sendDiscoveredNodesInfo function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do, mainly because it will take nodes from the k-table as well which includes the local node. But the main motivation for putting it here as well was so it doesn't have to be done in the sendRandomNodeInfo as well, so that I can just pick a random index from it without having to repeat the exercise if it somehow happens to be the local node.

)

case None =>
Task.pure(alreadyDiscoveredNodes(Random.nextInt(alreadyDiscoveredNodes.size)))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the PeerManagerActor most likely already have received this node when he received PeerDiscoveryManager.DiscoveredNodesInfo? Or is the purpose of this that he attempts to re-connect?

Copy link
Contributor Author

@aakoshh aakoshh Nov 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, if it has asked for DiscoveredNodesInfo it will have already got this node, as well as everything else. However we know that when it does so, it only ever takes up to 45 nodes from it, tries to connect, and only considers the rest when it asks for everything yet again. A connection may be blacklisted for just 6 minutes, before attempted again, but the PeerManagerActor doesn't do that on its own AFAIK, so this could be a way of trying again. Put it another way: it wouldn't ask if it didn't need one. And you're right, it only uses this method if it wants to replace a connection it created as a result of DiscoveredNodesInfo, to "re-connect", albeit with a (hopefully) different node.

Also here we cannot be certain that it's the PeerManagerActor asking for a random node.

In general a random node is also not unlikely to have been seen before in the case when we do random lookups, since the same nodes can be part of different search results.

consumer.pull
.flatMap {
case Left(None) =>
Task.raiseError(new IllegalStateException("The random node source is finished."))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but don't this exceptions result in Status.Failure being sent to the PeerManagerActor when piping the future to them?

If so, wouldn't it be better to handle them here and not send anything if that happened?

Copy link
Contributor Author

@aakoshh aakoshh Nov 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, if an exception is raised, Status.Failure is sent to the calling actor, which in our case is ignored by the PeerManagerActor. The error would be logged by pipeToRecipient, but this situation will never happen as the Iterant will never finish, it will keep repeating itself. I only included it here to cover all cases for the compiler, there's no need to handle anything, as it would be a programming error if this happened. I found though that returning errors helps in tests, that instead of a timeout we get to see the error message when using ask patterns.

@jmendiola222 jmendiola222 merged commit f51dcb5 into develop Dec 2, 2020
@aakoshh aakoshh deleted the ETCM-393-lookup-on-demand branch December 2, 2020 17:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants