-
Notifications
You must be signed in to change notification settings - Fork 75
ETCM-393: Lookup on demand #803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -53,6 +53,20 @@ class PeerManagerActor( | |||
|
|||
private type PeerMap = Map[PeerId, Peer] | |||
|
|||
implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) { | |||
def outgoingConnectionDemand: Int = | |||
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount - connectedPeers.outgoingPendingPeersCount
but I saw negative values, so I kept this as it was the original logic anyway.
sendDiscoveredNodesInfo(discovery.map(_._1), sender) | ||
|
||
case GetRandomNodeInfo => | ||
sendRandomNodeInfo(discovery.map(_._2), sender) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not well versed in Iterant so I may be missing something, but doesn't such strategy means that we will have muliple concurrent lookups on going ? i.e
- PeerManager make connection to 10 peers
- All of those connections fails
- PeerDiscoveryManager receivess 10 GetRandomNodeInfo messages which means it starts 10 conncurent random lookupus ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason this is not going to be happening is because those 10 requests are not doing 10 lookups, they are trying to pull 10 items from the concurrent consumer which is based on the iterant and has its own internal queue.
What's happening is that it sets up a consumer with a limited capacity, into which the iterant is constantly trying to push its items, but it pauses when the capacity is full. The pull which the requests kick off just tries to take 1 item from the underlying queue, which frees up space for the iterant to fill, which may result in another lookup happening via repeatEvalF
.
The good thing about the the ConcurrentChannel used by Iterant.consumeWithConfig
is that it can be consumed concurrently and it replicates messages to all consumers as well, which is what I ended up using in Scalanet ReqResponseChannel to replace the ConnectableSubject
. But here we only have one consumer, so there's no replication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with a buffer size of 45 and a bucket size of 16 what will happen (or rather, what would happen if the lookups were infinitely fast) is that the iterant will do a lookup, get 16 peers, push them into the consumer, which will have 29 more spaces in the buffer, so the iterant will do 2 more lookups to fill those, and ending up blocked trying to push the last 3 items it has. The first 2 pulls will not result in further lookups, but after pulling the 3rd item the iterant will again do a lookup and be blocked trying to push the next 16 items into the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sounds neat! Could you maybe add this (or some shortened version) as a commment where Iterant is instantiated ?
Most of the people (including me) are pretty well versed with observable and task, and Iterant is somewhat less used part of monix, and it would be nice to have this expalantion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added comments. I liked the Iterant
a lot, it's very similar to the Observable
in its API, but constructing it recursively is much more intuitive, so things like lazy database iterators using cursors are easier to build.
randomNodes = Iterant | ||
.repeatEvalF { | ||
Task(log.debug("Pulling random nodes on demand...")) >> | ||
service.lookupRandom |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is someting i just noticed, that lookupRandom uses sigalg.newKeyPair._1
to generate random key for each lookup and our implementation Secp256k1SigAlg
acutally generates real keys which can be pretty costly performance wise. Maybe for random lookups we should just generate random 64 bytes values ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that, although I don't think it's going to have any noticeable impact. On my machine generating 1000 new keys takes ~650ms. Generating 1000 random 64 byte arrays is just 4ms, but during my testing I only saw 1 lookup happening every ~10 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, although I preferred the simplicity of the original.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good ,i have also tested it on mainnet and was able to start sync with default settings in less than 10m so LGTM!
One caveat is that with the default setting of That's because every (handshaked) outgoing connection is someone else's incoming connection; in a network of N nodes the total incoming count equals the total outgoing count. With nodes trying to have 3x as many outgoing connections as the incoming they allow, the incoming slots will eventually become saturated on every node, leaving an insatiable demand for ~30 extra outgoing connections (on average), forever triggering random lookups. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some first comments!
if (connectedPeers.outgoingConnectionDemand > 0) { | ||
if (connectedPeers.canConnectTo(node)) { | ||
log.debug(s"Trying to connect to random node at ${node.addr}") | ||
self ! ConnectToPeer(node.toUri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up-to-discussion (and also for me understanding better the PR): Why aren't we requesting another random node in case with the node received we didn't get enough (that is: connectedPeers.outgoingConnectionDemand > 0
is still true)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I may be misunderstanding your question, I think we are doing exactly that:
if (outgoingConnectionDemand > 0) {
if (canConnectTo(node)) {
// connect
} else {
// ask for another
// Isn't this what you are asking? Demand is > 0 but we can't connect, so we ask for a replacement.
}
} else {
// ignore because outgoingConnectionDemand <= 0
}
knownPeers = (discoveryConfig.bootstrapNodes).map { node => | ||
// Since we're running the enrollment in the background, it won't hold up | ||
// anything even if we have to enroll with hundreds of previously known nodes. | ||
knownPeers = (discoveryConfig.bootstrapNodes ++ reusedKnownNodes).map { node => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit odd that we are repeating this same logic as on the PeerDiscoveryManager, do you think it's worth it or easy to remove this duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know what you mean, just didn't want to disturb the PeerDiscoveryManager
again, seeing that it already had that mechanism from before, I thought some redundancy doesn't hurt. It's not exactly the same though: it returns these nodes from the beginning, whereas this one needs them to also respond to pings and ENR request first. Also the bootstrap nodes are always returned by the PeerDiscoveryManager
, even if discovery is disabled - not so by the service, so if you had discovery disabled you'd really have nobody to connect to.
In theory there could be thousands of known nodes (in the eclipse paper there were more than 20,000 in the database), so it's conceivable that at some point someone would decide to undo this change.
But if you feel like this one is the only mechanism we should keep I'll remove the other.
@@ -35,16 +73,21 @@ class PeerDiscoveryManager( | |||
else | |||
knownNodesStorage.getKnownNodes().map(Node.fromUri) | |||
|
|||
bootstrapNodes ++ knownNodes | |||
(bootstrapNodes ++ knownNodes).filterNot(isLocalNode).toVector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: aren't we already filtering this before sending them on the sendDiscoveredNodesInfo
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do, mainly because it will take nodes from the k-table as well which includes the local node. But the main motivation for putting it here as well was so it doesn't have to be done in the sendRandomNodeInfo
as well, so that I can just pick a random index from it without having to repeat the exercise if it somehow happens to be the local node.
) | ||
|
||
case None => | ||
Task.pure(alreadyDiscoveredNodes(Random.nextInt(alreadyDiscoveredNodes.size))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the PeerManagerActor most likely already have received this node when he received PeerDiscoveryManager.DiscoveredNodesInfo
? Or is the purpose of this that he attempts to re-connect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct, if it has asked for DiscoveredNodesInfo
it will have already got this node, as well as everything else. However we know that when it does so, it only ever takes up to 45 nodes from it, tries to connect, and only considers the rest when it asks for everything yet again. A connection may be blacklisted for just 6 minutes, before attempted again, but the PeerManagerActor
doesn't do that on its own AFAIK, so this could be a way of trying again. Put it another way: it wouldn't ask if it didn't need one. And you're right, it only uses this method if it wants to replace a connection it created as a result of DiscoveredNodesInfo
, to "re-connect", albeit with a (hopefully) different node.
Also here we cannot be certain that it's the PeerManagerActor
asking for a random node.
In general a random node is also not unlikely to have been seen before in the case when we do random lookup
s, since the same nodes can be part of different search results.
consumer.pull | ||
.flatMap { | ||
case Left(None) => | ||
Task.raiseError(new IllegalStateException("The random node source is finished.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but don't this exceptions result in Status.Failure being sent to the PeerManagerActor when piping the future to them?
If so, wouldn't it be better to handle them here and not send anything if that happened?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, if an exception is raised, Status.Failure
is sent to the calling actor, which in our case is ignored by the PeerManagerActor
. The error would be logged by pipeToRecipient
, but this situation will never happen as the Iterant
will never finish, it will keep repeating itself. I only included it here to cover all cases for the compiler, there's no need to handle anything, as it would be a programming error if this happened. I found though that returning errors helps in tests, that instead of a timeout we get to see the error message when using ask
patterns.
Description
The PR adds a mechanism to
PeerManagerActor
andPeerDiscoveryManager
that should speed up the initial discovery of peers: whenever a peer disconnects from our node, a random replacement peer is seeked, fed from an infinite stream of random lookups. This works in tandem with the periodic attempt of connecting to a number of peers that Mantis does now.Changed the default settings so we use lower request timeouts because with the defaults a single lookup round took ~30 seconds; with the new settings it's closer to 10 seconds. The node discovered 1000 nodes in 4 minutes (down from 10 minutes before), and 2000 nodes by the 10 minute mark.
The PR also changes the enrollment so that it re-connects to previously known nodes, not just the configured bootstraps. This is in addition to the
PeerDiscoveryManager
serving them as "already discovered", so the benefit we can get is that if they don't want to connect to us any more, we may discover replacement nodes quicker.Proposed Solution
The problem with the new discovery mechanism is that it looks up nodes on a fixed schedule and the amount of nodes discovered and attempted to connect with in each round is limited, however most nodes on
etc
already haveTooManyPeers
and disconnect from us, and thePeerManagerActor
only attempts another round every so often (it was 10 seconds so far). We can ramp up the discovery interval (a different timer) but Mantis only wants to connect to 45 nodes; once that's reached there's no need for rapidly firing lookup requests at peers.With the PRs changes, the node performs random lookups on demand as long as it hasn't reached 45 outgoing connections. After that the more relaxed discovery interval of 15 minutes by default should be sufficient. It does this by asking for one random node to replace any dropped connection, which is served by doing a random lookup (which returns up to 16 nodes) and keeping the results buffered (up to 45 nodes in the buffer).
Alternatives
Another, simpler approach I considered was what trinity is doing, which is to add the number of required peers and the skip function to
GetDiscoveredNodesInfo
, letting thePeerDiscoveryManager
sift through the nodes it has to try and find enough peers, returning however many it found, and kicking off a random lookup in the background if there isn't enough, unless one is running already.The reason I though this would yield less peers than the one-for-one replacement strategy in the PR is that
GetDiscoveredNodesInfo
is only sent at a fixed schedule. If it takes much less time to discard candidates than the schedule interval, thePeerManagerActor
would stay idle for longer times.For example this could happen:
Compared to this, the PR continuously runs background lookups as long as the random nodes it found are being consumed, which happens as soon as a node cannot be connected to.
Regarding leaving both this existing, periodic mechanism in
PeerManagerActor
and adding the new like-for-like replacement strategy on top, instead of finding a unified model: @KonradStaniec expressed some anxiety over the risks that come with changing the actor, so I thought this is the lightest touch. In theory we could rely solely on the random peer dispenser (ask for 45 in the beginning and keep replacing them) but that would be more difficult to prove correct. Since the existing way didn't care about occasional overflows, it looks like it should be able to handle the new random nodes as well.Testing
Connected to
etc
:Then looking in the logs at
~/.mantis/etc/logs/mantis.log