Skip to content

Commit 4f577df

Browse files
authored
fix: reduce new peer notifications and subscribe to topics when skipping sync (#1075)
1 parent e90cb93 commit 4f577df

File tree

3 files changed

+27
-26
lines changed

3 files changed

+27
-26
lines changed

lib/lambda_ethereum_consensus/beacon/sync_blocks.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
2929
initial_slot = Misc.compute_start_slot_at_epoch(checkpoint.epoch) + 1
3030
last_slot = BeaconChain.get_current_slot()
3131

32+
# If we're around genesis, we consider ourselves synced
3233
if last_slot > 0 do
3334
Enum.chunk_every(initial_slot..last_slot, @blocks_per_chunk)
3435
|> Enum.map(fn chunk ->
@@ -38,6 +39,8 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
3839
%{from: first_slot, count: count}
3940
end)
4041
|> perform_sync()
42+
else
43+
start_subscriptions()
4144
end
4245
end
4346

lib/lambda_ethereum_consensus/p2p/peerbook.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,13 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do
5454

5555
@impl true
5656
def handle_info({:new_peer, peer_id}, peerbook) do
57-
:telemetry.execute([:peers, :connection], %{id: peer_id}, %{result: "success"})
58-
updated_peerbook = Map.put(peerbook, peer_id, @initial_score)
59-
{:noreply, updated_peerbook}
57+
if Map.has_key?(peerbook, peer_id) do
58+
{:noreply, peerbook}
59+
else
60+
:telemetry.execute([:peers, :connection], %{id: peer_id}, %{result: "success"})
61+
updated_peerbook = Map.put(peerbook, peer_id, @initial_score)
62+
{:noreply, updated_peerbook}
63+
end
6064
end
6165

6266
@impl true

native/libp2p_port/internal/discovery/discovery.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"libp2p_port/internal/reqresp"
1010
"libp2p_port/internal/utils"
1111
"net"
12+
"time"
1213

1314
"github.com/ethereum/go-ethereum/p2p/discover"
1415
"github.com/ethereum/go-ethereum/p2p/enode"
1516
"github.com/ethereum/go-ethereum/p2p/enr"
17+
"github.com/libp2p/go-libp2p/core/network"
1618
"github.com/libp2p/go-libp2p/core/peer"
1719
"github.com/libp2p/go-libp2p/core/peerstore"
1820
ma "github.com/multiformats/go-multiaddr"
@@ -71,30 +73,18 @@ func lookForPeers(iter enode.Iterator, listener *reqresp.Listener, forkUpdates c
7173
currentForkDigest := <-forkUpdates
7274
for iter.Next() {
7375
node := iter.Node()
76+
time.Sleep(1 * time.Millisecond)
7477
updateForkDigest(currentForkDigest[:], forkUpdates)
75-
if !filterPeer(node, currentForkDigest[:]) {
76-
continue
77-
}
78-
var addrArr []string
79-
if node.TCP() != 0 {
80-
str := fmt.Sprintf("/ip4/%s/tcp/%d", node.IP(), node.TCP())
81-
addrArr = append(addrArr, str)
82-
} else if node.UDP() != 0 {
83-
str := fmt.Sprintf("/ip4/%s/udp/%d/quic", node.IP(), node.UDP())
84-
addrArr = append(addrArr, str)
85-
} else {
86-
continue
87-
}
88-
key, err := utils.ConvertToInterfacePubkey(node.Pubkey())
89-
if err != nil {
78+
79+
if !filterPeer(node, currentForkDigest[:], listener) {
9080
continue
9181
}
92-
nodeID, err := peer.IDFromPublicKey(key)
82+
addrInfo, err := convertToAddrInfo(node)
9383
if err != nil {
9484
continue
9585
}
9686
go func() {
97-
listener.AddPeer([]byte(nodeID), addrArr, peerstore.PermanentAddrTTL)
87+
listener.AddPeerWithAddrInfo(*addrInfo, peerstore.PermanentAddrTTL)
9888
}()
9989
}
10090
}
@@ -117,11 +107,11 @@ func updateForkDigest(currentForkDigest []byte, forkUpdates chan [4]byte) {
117107
// connect to.
118108
// 2. Peer has a valid IP and TCP port set in their enr.
119109
// 3. ~Peer hasn't been marked as 'bad'~
120-
// 4. ~Peer is not currently active or connected.~
110+
// 4. Peer is not currently active or connected.
121111
// 5. ~Peer is ready to receive incoming connections.~
122112
// 6. Peer's fork digest in their ENR matches that of
123113
// our localnodes.
124-
func filterPeer(node *enode.Node, currentForkDigest []byte) bool {
114+
func filterPeer(node *enode.Node, currentForkDigest []byte, listener *reqresp.Listener) bool {
125115
// Ignore nil node entries passed in.
126116
if node == nil {
127117
return false
@@ -135,6 +125,10 @@ func filterPeer(node *enode.Node, currentForkDigest []byte) bool {
135125
if err := nodeENR.Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
136126
return false
137127
}
128+
peerData, err := convertToAddrInfo(node)
129+
if err != nil || listener.Host().Network().Connectedness(peerData.ID) == network.Connected {
130+
return false
131+
}
138132
// Decide whether or not to connect to peer that does not
139133
// match the proper fork ENR data with our local node.
140134
sszEncodedForkEntry := make([]byte, 16)
@@ -195,16 +189,16 @@ func updateEnr(localNode *enode.LocalNode, e proto_helpers.Enr) {
195189
localNode.Set(enr.WithEntry("syncnets", e.Syncnets))
196190
}
197191

198-
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) {
192+
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, error) {
199193
multiAddr, err := convertToSingleMultiAddr(node)
200194
if err != nil {
201-
return nil, nil, err
195+
return nil, err
202196
}
203197
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
204198
if err != nil {
205-
return nil, nil, err
199+
return nil, err
206200
}
207-
return info, multiAddr, nil
201+
return info, nil
208202
}
209203

210204
func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) {

0 commit comments

Comments
 (0)