Skip to content

Commit 1ea4cff

Browse files
author
Nicolás Tallar
authored
[ETCM-178] Disallow duplicated connections and connections to self (#763)
- Disallow repeated connections and connections to self - Added further lazy vals to ConnectedPeers
1 parent 20d055d commit 1ea4cff

File tree

10 files changed

+284
-139
lines changed

10 files changed

+284
-139
lines changed

src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,13 @@ class DumpChainActor(
140140
val account = n.value.toArray[Byte].toAccount
141141

142142
if (account.codeHash != DumpChainActor.emptyEvm) {
143-
peers.headOption.foreach { case Peer(_, _, _) =>
143+
peers.headOption.foreach { _ =>
144144
evmTorequest = evmTorequest :+ account.codeHash
145145
evmCodeHashes = evmCodeHashes + account.codeHash
146146
}
147147
}
148148
if (account.storageRoot != DumpChainActor.emptyStorage) {
149-
peers.headOption.foreach { case Peer(_, _, _) =>
149+
peers.headOption.foreach { _ =>
150150
contractChildren = contractChildren :+ account.storageRoot
151151
contractNodesHashes = contractNodesHashes + account.storageRoot
152152
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package io.iohk.ethereum.network
2+
3+
import java.net.InetSocketAddress
4+
5+
import akka.actor.ActorRef
6+
import akka.util.ByteString
7+
8+
case class ConnectedPeers(
9+
private val incomingPendingPeers: Map[PeerId, Peer],
10+
private val outgoingPendingPeers: Map[PeerId, Peer],
11+
private val handshakedPeers: Map[PeerId, Peer]
12+
) {
13+
14+
// FIXME: Kept only for compatibility purposes, should eventually be removed
15+
lazy val peers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers
16+
17+
private lazy val allPeers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers ++ incomingPendingPeers
18+
19+
private lazy val allPeersRemoteAddresses: Set[InetSocketAddress] = allPeers.values.map(_.remoteAddress).toSet
20+
def isConnectionHandled(remoteAddress: InetSocketAddress): Boolean =
21+
allPeersRemoteAddresses.contains(remoteAddress)
22+
23+
/*
24+
We have the node id of our outgoing pending peers so we could use that in our checks, by rejecting a peer that
25+
handshaked to us with the same node id.
26+
However, with checking the node id of only handshaked peers we prioritize handshaked peers over pending ones,
27+
in the above mentioned case the repeated pending peer connection will eventually die out
28+
*/
29+
private lazy val handshakedPeersNodeIds: Set[ByteString] = handshakedPeers.values.flatMap(_.nodeId).toSet
30+
def hasHandshakedWith(nodeId: ByteString): Boolean =
31+
handshakedPeersNodeIds.contains(nodeId)
32+
33+
lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size
34+
lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection }
35+
lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection }
36+
37+
lazy val handshakedPeersCount: Int = handshakedPeers.size
38+
lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeers.size
39+
40+
def getPeer(peerId: PeerId): Option[Peer] = peers.get(peerId)
41+
42+
def addNewPendingPeer(pendingPeer: Peer): ConnectedPeers = {
43+
if (pendingPeer.incomingConnection)
44+
copy(incomingPendingPeers = incomingPendingPeers + (pendingPeer.id -> pendingPeer))
45+
else
46+
copy(outgoingPendingPeers = outgoingPendingPeers + (pendingPeer.id -> pendingPeer))
47+
}
48+
49+
def promotePeerToHandshaked(peerAfterHandshake: Peer): ConnectedPeers = {
50+
if (peerAfterHandshake.incomingConnection)
51+
copy(
52+
incomingPendingPeers = incomingPendingPeers - peerAfterHandshake.id,
53+
handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake)
54+
)
55+
else
56+
copy(
57+
outgoingPendingPeers = outgoingPendingPeers - peerAfterHandshake.id,
58+
handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake)
59+
)
60+
}
61+
62+
def removeTerminatedPeer(peerRef: ActorRef): (Iterable[PeerId], ConnectedPeers) = {
63+
val peersId = allPeers.collect { case (id, peer) if peer.ref == peerRef => id }
64+
65+
(
66+
peersId,
67+
ConnectedPeers(incomingPendingPeers -- peersId, outgoingPendingPeers -- peersId, handshakedPeers -- peersId)
68+
)
69+
}
70+
}
71+
72+
object ConnectedPeers {
73+
def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty)
74+
}

src/main/scala/io/iohk/ethereum/network/peer.scala renamed to src/main/scala/io/iohk/ethereum/network/Peer.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,21 @@ package io.iohk.ethereum.network
33
import java.net.InetSocketAddress
44

55
import akka.actor.ActorRef
6+
import akka.util.ByteString
67
import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId
78

89
case class PeerId(value: String) extends BlackListId
910

10-
case class Peer(remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean) {
11+
object PeerId {
12+
def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name)
13+
}
14+
15+
case class Peer(
16+
remoteAddress: InetSocketAddress,
17+
ref: ActorRef,
18+
incomingConnection: Boolean,
19+
nodeId: Option[ByteString] = None
20+
) {
1121
// FIXME PeerId should be actual peerId i.e id derived form node public key
12-
def id: PeerId = PeerId(ref.path.name)
22+
def id: PeerId = PeerId.fromRef(ref)
1323
}

src/main/scala/io/iohk/ethereum/network/PeerActor.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult](
4949

5050
def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler
5151

52-
val peerId: PeerId = PeerId(self.path.name)
53-
54-
val peer: Peer = Peer(peerAddress, self, incomingConnection)
52+
val peerId: PeerId = PeerId.fromRef(self)
5553

5654
override def receive: Receive = waitingForInitialCommand
5755

@@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult](
8785
case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) =>
8886
val newUri =
8987
rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri))
90-
processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries)
88+
processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries)
9189

9290
case RLPxConnectionHandler.ConnectionFailed =>
9391
log.debug("Failed to establish RLPx connection")
@@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult](
109107

110108
def processingHandshaking(
111109
handshaker: Handshaker[R],
110+
remoteNodeId: ByteString,
112111
rlpxConnection: RLPxConnection,
113112
timeout: Cancellable,
114113
numRetries: Int
@@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult](
122121
// handles the received message
123122
handshaker.applyMessage(msg).foreach { newHandshaker =>
124123
timeout.cancel()
125-
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
124+
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
126125
}
127126
handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend))
128127

129128
case ResponseTimeout =>
130129
timeout.cancel()
131130
val newHandshaker = handshaker.processTimeout
132-
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
131+
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
133132

134133
case GetStatus => sender() ! StatusResponse(Handshaking(numRetries))
135134

@@ -145,18 +144,19 @@ class PeerActor[R <: HandshakeResult](
145144
*/
146145
private def processHandshakerNextMessage(
147146
handshaker: Handshaker[R],
147+
remoteNodeId: ByteString,
148148
rlpxConnection: RLPxConnection,
149149
numRetries: Int
150150
): Unit =
151151
handshaker.nextMessage match {
152152
case Right(NextMessage(msgToSend, timeoutTime)) =>
153153
rlpxConnection.sendMessage(msgToSend)
154154
val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout)
155-
context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries)
155+
context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries)
156156

157157
case Left(HandshakeSuccess(handshakeResult)) =>
158158
rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) }
159-
context become new HandshakedPeer(rlpxConnection, handshakeResult).receive
159+
context become new HandshakedPeer(remoteNodeId, rlpxConnection, handshakeResult).receive
160160
unstashAll()
161161

162162
case Left(HandshakeFailure(reason)) =>
@@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult](
244244
stash()
245245
}
246246

247-
class HandshakedPeer(rlpxConnection: RLPxConnection, handshakeResult: R) {
247+
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {
248248

249+
val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId))
249250
peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult))
250251

251252
/**

0 commit comments

Comments
 (0)