Skip to content

Commit fa09166

Browse files
committed
Merge remote-tracking branch 'origin/phase/beta1' into fix/downloadMptFirst
2 parents cb7a1b5 + cdd7533 commit fa09166

File tree

3 files changed

+112
-40
lines changed

3 files changed

+112
-40
lines changed

src/main/scala/io/iohk/ethereum/network/PeerActor.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,20 @@ package io.iohk.ethereum.network
22

33
import java.net.{InetSocketAddress, URI}
44

5+
import akka.actor.SupervisorStrategy.Escalate
56
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
67
import akka.actor._
7-
import akka.agent.Agent
8-
import akka.util.ByteString
98
import io.iohk.ethereum.network.p2p._
109
import io.iohk.ethereum.network.p2p.messages.WireProtocol._
1110
import io.iohk.ethereum.network.p2p.messages.Versions
1211
import io.iohk.ethereum.network.rlpx.{AuthHandshaker, RLPxConnectionHandler}
1312
import io.iohk.ethereum.network.PeerActor.Status._
1413
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.{MessageFromPeer, PeerHandshakeSuccessful}
1514
import io.iohk.ethereum.network.PeerEventBusActor.Publish
16-
import io.iohk.ethereum.utils.NodeStatus
1715
import io.iohk.ethereum.network.handshaker.Handshaker
1816
import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeComplete.{HandshakeFailure, HandshakeSuccess}
1917
import io.iohk.ethereum.network.handshaker.Handshaker.{HandshakeResult, NextMessage}
2018
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
21-
import org.spongycastle.crypto.AsymmetricCipherKeyPair
2219
import org.spongycastle.util.encoders.Hex
2320

2421

@@ -43,6 +40,11 @@ class PeerActor[R <: HandshakeResult](
4340
import PeerActor._
4441
import context.{dispatcher, system}
4542

43+
override val supervisorStrategy: OneForOneStrategy =
44+
OneForOneStrategy() {
45+
case _ => Escalate
46+
}
47+
4648
def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler
4749

4850
val peerId: PeerId = PeerId(self.path.name)

src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ class RLPxConnectionHandler(
4242

4343
override def receive: Receive = waitingForCommand
4444

45+
def tcpActor: ActorRef = IO(Tcp)
46+
4547
def waitingForCommand: Receive = {
4648
case ConnectTo(uri) =>
47-
IO(Tcp) ! Connect(new InetSocketAddress(uri.getHost, uri.getPort))
49+
tcpActor ! Connect(new InetSocketAddress(uri.getHost, uri.getPort))
4850
context become waitingForConnectionResult(uri)
4951

5052
case HandleConnection(connection) =>
@@ -74,42 +76,64 @@ class RLPxConnectionHandler(
7476
handleTimeout orElse handleConnectionClosed orElse {
7577
case Received(data) =>
7678
timeout.cancel()
77-
Try(handshaker.handleInitialMessage(data.take(InitiatePacketLength))) match {
78-
case Success((responsePacket, result)) =>
79-
// process pre-eip8 message
80-
val remainingData = data.drop(InitiatePacketLength)
81-
connection ! Write(responsePacket)
82-
processHandshakeResult(result, remainingData)
79+
val maybePreEIP8Result = Try {
80+
val (responsePacket, result) = handshaker.handleInitialMessage(data.take(InitiatePacketLength))
81+
val remainingData = data.drop(InitiatePacketLength)
82+
(responsePacket, result, remainingData)
83+
}
84+
lazy val maybePostEIP8Result = Try {
85+
val (packetData, remainingData) = decodeV4Packet(data)
86+
val (responsePacket, result) = handshaker.handleInitialMessageV4(packetData)
87+
(responsePacket, result, remainingData)
88+
}
8389

84-
case Failure(_) =>
85-
// process as eip8 message
86-
val encryptedPayloadSize = ByteUtils.bigEndianToShort(data.take(2).toArray)
87-
val (packetData, remainingData) = data.splitAt(encryptedPayloadSize + 2)
88-
val (responsePacket, result) = handshaker.handleInitialMessageV4(packetData)
90+
maybePreEIP8Result orElse maybePostEIP8Result match {
91+
case Success((responsePacket, result, remainingData)) =>
8992
connection ! Write(responsePacket)
9093
processHandshakeResult(result, remainingData)
94+
95+
case Failure(ex) =>
96+
log.debug(s"[Stopping Connection] Init AuthHandshaker message handling failed for peer $peerId due to ${ex.getMessage}")
97+
context.parent ! ConnectionFailed
98+
context stop self
9199
}
92100
}
93101

94102
def waitingForAuthHandshakeResponse(handshaker: AuthHandshaker, timeout: Cancellable): Receive =
95103
handleWriteFailed orElse handleTimeout orElse handleConnectionClosed orElse {
96104
case Received(data) =>
97105
timeout.cancel()
98-
Try(handshaker.handleResponseMessage(data.take(ResponsePacketLength))) match {
99-
case Success(result) =>
100-
// process pre-eip8 message
101-
val remainingData = data.drop(ResponsePacketLength)
102-
processHandshakeResult(result, remainingData)
103-
104-
case Failure(_) =>
105-
// process as eip8 message
106-
val size = ByteUtils.bigEndianToShort(data.take(2).toArray)
107-
val (packetData, remainingData) = data.splitAt(size + 2)
108-
val result = handshaker.handleResponseMessageV4(packetData)
109-
processHandshakeResult(result, remainingData)
106+
val maybePreEIP8Result = Try {
107+
val result = handshaker.handleResponseMessage(data.take(ResponsePacketLength))
108+
val remainingData = data.drop(ResponsePacketLength)
109+
(result, remainingData)
110+
}
111+
val maybePostEIP8Result = Try {
112+
val (packetData, remainingData) = decodeV4Packet(data)
113+
val result = handshaker.handleResponseMessageV4(packetData)
114+
(result, remainingData)
115+
}
116+
maybePreEIP8Result orElse maybePostEIP8Result match {
117+
case Success((result, remainingData)) => processHandshakeResult(result, remainingData)
118+
case Failure(ex) =>
119+
log.debug(s"[Stopping Connection] Response AuthHandshaker message handling failed for peer $peerId due to ${ex.getMessage}")
120+
context.parent ! ConnectionFailed
121+
context stop self
110122
}
111123
}
112124

125+
/**
126+
* Decode V4 packet
127+
*
128+
* @param data, includes both the V4 packet with bytes from next messages
129+
* @return data of the packet and the remaining data
130+
*/
131+
private def decodeV4Packet(data: ByteString): (ByteString, ByteString) = {
132+
val encryptedPayloadSize = ByteUtils.bigEndianToShort(data.take(2).toArray)
133+
val (packetData, remainingData) = data.splitAt(encryptedPayloadSize + 2)
134+
packetData -> remainingData
135+
}
136+
113137
def handleTimeout: Receive = {
114138
case AuthHandshakeTimeout =>
115139
log.debug(s"[Stopping Connection] Auth handshake timeout for peer $peerId")

src/test/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandlerSpec.scala

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.iohk.ethereum.network.rlpx
22

3-
import akka.actor.{ActorSystem, Props}
3+
import java.net.{InetSocketAddress, URI}
4+
5+
import akka.actor.{ActorRef, ActorSystem, Props}
46
import akka.io.Tcp
57
import akka.testkit.{TestActorRef, TestProbe}
68
import akka.util.ByteString
@@ -20,7 +22,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
2022

2123
it should "write messages send to TCP connection" in new TestSetup {
2224

23-
setupRLPxConnection()
25+
setupIncomingRLPxConnection()
2426

2527
(mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded"))
2628
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping())
@@ -32,7 +34,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
3234

3335
(mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes()
3436

35-
setupRLPxConnection()
37+
setupIncomingRLPxConnection()
3638

3739
//Send first message
3840
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping())
@@ -51,7 +53,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
5153

5254
(mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes()
5355

54-
setupRLPxConnection()
56+
setupIncomingRLPxConnection()
5557

5658
//Send several messages
5759
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping())
@@ -76,9 +78,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
7678
it should "close the connection when Ack timeout happens" in new TestSetup {
7779
(mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes()
7880

79-
setupRLPxConnection()
80-
81-
rlpxConnectionParent watch rlpxConnection
81+
setupIncomingRLPxConnection()
8282

8383
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping())
8484
connection.expectMsg(Tcp.Write(ByteString("ping encoded"), RLPxConnectionHandler.Ack))
@@ -90,7 +90,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
9090
it should "ignore timeout of old messages" in new TestSetup {
9191
(mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes()
9292

93-
setupRLPxConnection()
93+
setupIncomingRLPxConnection()
9494

9595
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping()) //With SEQ number 0
9696
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping()) //With SEQ number 1
@@ -111,6 +111,44 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
111111
connection.expectMsg(Tcp.Write(ByteString("ping encoded"), RLPxConnectionHandler.Ack))
112112
}
113113

114+
it should "close the connection if the AuthHandshake init message's MAC is invalid" in new TestSetup {
115+
//Incomming connection arrives
116+
rlpxConnection ! RLPxConnectionHandler.HandleConnection(connection.ref)
117+
connection.expectMsgClass(classOf[Tcp.Register])
118+
119+
//AuthHandshaker throws exception on initial message
120+
(mockHandshaker.handleInitialMessage _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")}
121+
(mockHandshaker.handleInitialMessageV4 _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")}
122+
123+
val data = ByteString((0 until AuthHandshaker.InitiatePacketLength).map(_.toByte).toArray)
124+
rlpxConnection ! Tcp.Received(data)
125+
rlpxConnectionParent.expectMsg(RLPxConnectionHandler.ConnectionFailed)
126+
rlpxConnectionParent.expectTerminated(rlpxConnection)
127+
}
128+
129+
it should "close the connection if the AuthHandshake response message's MAC is invalid" in new TestSetup {
130+
//Outgoing connection request arrives
131+
rlpxConnection ! RLPxConnectionHandler.ConnectTo(uri)
132+
tcpActorProbe.expectMsg(Tcp.Connect(inetAddress))
133+
134+
//The TCP connection results are handled
135+
val initPacket = ByteString("Init packet")
136+
(mockHandshaker.initiate _).expects(uri).returning(initPacket -> mockHandshaker)
137+
138+
tcpActorProbe.reply(Tcp.Connected(inetAddress, inetAddress))
139+
tcpActorProbe.expectMsg(Tcp.Register(rlpxConnection))
140+
tcpActorProbe.expectMsg(Tcp.Write(initPacket))
141+
142+
//AuthHandshaker handles the response message (that throws an invalid MAC)
143+
(mockHandshaker.handleResponseMessage _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")}
144+
(mockHandshaker.handleResponseMessageV4 _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")}
145+
146+
val data = ByteString((0 until AuthHandshaker.ResponsePacketLength).map(_.toByte).toArray)
147+
rlpxConnection ! Tcp.Received(data)
148+
rlpxConnectionParent.expectMsg(RLPxConnectionHandler.ConnectionFailed)
149+
rlpxConnectionParent.expectTerminated(rlpxConnection)
150+
}
151+
114152
trait TestSetup extends MockFactory with SecureRandomBuilder {
115153
implicit val system = ActorSystem("RLPxHandlerSpec_System")
116154

@@ -124,19 +162,27 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory
124162
val connection = TestProbe()
125163
val mockMessageCodec = mock[MessageCodec]
126164

165+
val uri = new URI("enode://18a551bee469c2e02de660ab01dede06503c986f6b8520cb5a65ad122df88b17b285e3fef09a40a0d44f99e014f8616cf1ebc2e094f96c6e09e2f390f5d34857@47.90.36.129:30303")
166+
val inetAddress = new InetSocketAddress(uri.getHost, uri.getPort)
167+
127168
val rlpxConfiguration = new RLPxConfiguration {
128169
override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout
129-
override val waitForHandshakeTimeout: FiniteDuration = Timeouts.normalTimeout
170+
171+
//unused
172+
override val waitForHandshakeTimeout: FiniteDuration = Timeouts.veryLongTimeout
130173
}
131174

175+
val tcpActorProbe = TestProbe()
132176
val rlpxConnectionParent = TestProbe()
133177
val rlpxConnection = TestActorRef(
134-
Props(new RLPxConnectionHandler(
135-
mockMessageDecoder, protocolVersion, mockHandshaker, (_, _, _) => mockMessageCodec, rlpxConfiguration)),
178+
Props(new RLPxConnectionHandler(mockMessageDecoder, protocolVersion, mockHandshaker, (_, _, _) => mockMessageCodec, rlpxConfiguration) {
179+
override def tcpActor: ActorRef = tcpActorProbe.ref
180+
}),
136181
rlpxConnectionParent.ref)
182+
rlpxConnectionParent watch rlpxConnection
137183

138184
//Setup for RLPxConnection, after it the RLPxConnectionHandler is in a handshaked state
139-
def setupRLPxConnection(): Unit = {
185+
def setupIncomingRLPxConnection(): Unit = {
140186
//Start setting up connection
141187
rlpxConnection ! RLPxConnectionHandler.HandleConnection(connection.ref)
142188
connection.expectMsgClass(classOf[Tcp.Register])

0 commit comments

Comments
 (0)