Skip to content

Commit 06c17e3

Browse files
[ETCM-355] RLPxConnectionHandler.processMessage doesn't use Try
1 parent 130d371 commit 06c17e3

File tree

5 files changed

+24
-16
lines changed

5 files changed

+24
-16
lines changed

src/main/scala/io/iohk/ethereum/network/p2p/Message.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ trait MessageSerializable extends Message {
2323

2424
@FunctionalInterface
2525
trait MessageDecoder extends Logger { self =>
26-
27-
type DecodingError = Throwable // TODO: Replace Throwable with an ADT when feasible
26+
import MessageDecoder._
2827

2928
def fromBytes(`type`: Int, payload: Array[Byte]): Either[DecodingError, Message]
3029

@@ -41,3 +40,7 @@ trait MessageDecoder extends Logger { self =>
4140
}
4241
}
4342
}
43+
44+
object MessageDecoder {
45+
type DecodingError = Throwable // TODO: Replace Throwable with an ADT when feasible
46+
}

src/main/scala/io/iohk/ethereum/network/p2p/MessageDecoders.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.iohk.ethereum.network.p2p.messages.WireProtocol.Ping._
1919
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Pong._
2020
import io.iohk.ethereum.network.p2p.messages.WireProtocol._
2121
import scala.util.Try
22+
import MessageDecoder._
2223

2324
object NetworkMessageDecoder extends MessageDecoder {
2425

src/main/scala/io/iohk/ethereum/network/rlpx/MessageCodec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.xerial.snappy.Snappy
1313
import io.iohk.ethereum.network.handshaker.EtcHelloExchangeState
1414
import io.iohk.ethereum.network.p2p.Message
1515
import io.iohk.ethereum.network.p2p.MessageDecoder
16+
import io.iohk.ethereum.network.p2p.MessageDecoder.DecodingError
1617
import io.iohk.ethereum.network.p2p.MessageSerializable
1718
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Hello
1819

@@ -32,12 +33,12 @@ class MessageCodec(
3233
val contextIdCounter = new AtomicInteger
3334

3435
// TODO: ETCM-402 - messageDecoder should use negotiated protocol version
35-
def readMessages(data: ByteString): Seq[Try[Message]] = {
36+
def readMessages(data: ByteString): Seq[Either[DecodingError, Message]] = {
3637
val frames = frameCodec.readFrames(data)
3738
readFrames(frames)
3839
}
3940

40-
def readFrames(frames: Seq[Frame]): Seq[Try[Message]] =
41+
def readFrames(frames: Seq[Frame]): Seq[Either[DecodingError, Message]] =
4142
frames.map { frame =>
4243
val frameData = frame.payload.toArray
4344
val payloadTry =
@@ -47,8 +48,8 @@ class MessageCodec(
4748
Success(frameData)
4849
}
4950

50-
payloadTry.map { payload =>
51-
messageDecoder.fromBytesUnsafe(frame.`type`, payload)
51+
payloadTry.toEither.flatMap { payload =>
52+
messageDecoder.fromBytes(frame.`type`, payload)
5253
}
5354
}
5455

src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.bouncycastle.util.encoders.Hex
1919

2020
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
2121
import io.iohk.ethereum.network.p2p.Message
22+
import io.iohk.ethereum.network.p2p.MessageDecoder._
2223
import io.iohk.ethereum.network.p2p.MessageSerializable
2324
import io.iohk.ethereum.network.p2p.NetworkMessageDecoder
2425
import io.iohk.ethereum.network.p2p.messages.Capability
@@ -255,11 +256,11 @@ class RLPxConnectionHandler(
255256
messagesSoFar.foreach(processMessage)
256257
}
257258

258-
def processMessage(messageTry: Try[Message]): Unit = messageTry match {
259-
case Success(message) =>
259+
def processMessage(messageTry: Either[DecodingError, Message]): Unit = messageTry match {
260+
case Right(message) =>
260261
context.parent ! MessageReceived(message)
261262

262-
case Failure(ex) =>
263+
case Left(ex) =>
263264
log.info("Cannot decode message from {}, because of {}", peerId, ex.getMessage)
264265
// break connection in case of failed decoding, to avoid attack which would send us garbage
265266
context.stop(self)
@@ -450,8 +451,10 @@ object RLPxConnectionHandler {
450451
private def extractHello(frame: Frame): Option[Hello] = {
451452
val frameData = frame.payload.toArray
452453
if (frame.`type` == Hello.code) {
453-
val m = NetworkMessageDecoder.fromBytesUnsafe(frame.`type`, frameData)
454-
Some(m.asInstanceOf[Hello])
454+
NetworkMessageDecoder.fromBytes(frame.`type`, frameData) match {
455+
case Left(err) => throw err // TODO: rethink throwing here
456+
case Right(msg) => Some(msg.asInstanceOf[Hello])
457+
}
455458
} else {
456459
None
457460
}

src/test/scala/io/iohk/ethereum/network/p2p/MessageCodecSpec.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class MessageCodecSpec extends AnyFlatSpec with Matchers {
2525

2626
// remote peer did not receive local status so it treats all remote messages as uncompressed
2727
assert(remoteReadNotCompressedStatus.size == 1)
28-
assert(remoteReadNotCompressedStatus.head.get == status)
28+
assert(remoteReadNotCompressedStatus.head == Right(status))
2929
}
3030

3131
it should "compress messages when remote side advertises p2p version larger or equal 5" in new TestSetup {
@@ -41,7 +41,7 @@ class MessageCodecSpec extends AnyFlatSpec with Matchers {
4141
// remote peer did not receive local status so it treats all remote messages as uncompressed,
4242
// but local peer compress messages after V5 Hello message
4343
assert(remoteReadNotCompressedStatus.size == 1)
44-
assert(remoteReadNotCompressedStatus.head.isFailure)
44+
assert(remoteReadNotCompressedStatus.head.isLeft)
4545
}
4646

4747
it should "compress messages when both sides advertises p2p version larger or equal 5" in new TestSetup {
@@ -56,7 +56,7 @@ class MessageCodecSpec extends AnyFlatSpec with Matchers {
5656

5757
// both peers exchanged v5 hellos, so they should send compressed messages
5858
assert(remoteReadNextMessageAfterHello.size == 1)
59-
assert(remoteReadNextMessageAfterHello.head.get == status)
59+
assert(remoteReadNextMessageAfterHello.head == Right(status))
6060
}
6161

6262
it should "compress and decompress first message after hello when receiving 2 frames" in new TestSetup {
@@ -72,8 +72,8 @@ class MessageCodecSpec extends AnyFlatSpec with Matchers {
7272

7373
// both peers exchanged v5 hellos, so they should send compressed messages
7474
assert(remoteReadBothMessages.size == 2)
75-
assert(remoteReadBothMessages.head.get == helloV5)
76-
assert(remoteReadBothMessages.last.get == status)
75+
assert(remoteReadBothMessages.head == Right(helloV5))
76+
assert(remoteReadBothMessages.last == Right(status))
7777
}
7878

7979
trait TestSetup extends SecureChannelSetup {

0 commit comments

Comments
 (0)