Skip to content

Commit c3abf45

Browse files
authored
[ETCM-841] Consume remaining data when extracting Hello in order (#1019)
* [ETCM-841] Consume remaining data when extracting Hello in order * [ETCM-841] Align unit test with changes in protocol * [ETCM-841] Apply PR remarks
1 parent 4f9c093 commit c3abf45

File tree

2 files changed

+45
-52
lines changed

2 files changed

+45
-52
lines changed

src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala

Lines changed: 43 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class RLPxConnectionHandler(
104104

105105
case Failure(ex) =>
106106
log.debug(
107-
s"[Stopping Connection] Init AuthHandshaker message handling failed for peer {} due to {}",
107+
"[Stopping Connection] Init AuthHandshaker message handling failed for peer {} due to {}",
108108
peerId,
109109
ex.getMessage
110110
)
@@ -132,7 +132,7 @@ class RLPxConnectionHandler(
132132

133133
case Failure(ex) =>
134134
log.debug(
135-
s"[Stopping Connection] Response AuthHandshaker message handling failed for peer {} due to {}",
135+
"[Stopping Connection] Response AuthHandshaker message handling failed for peer {} due to {}",
136136
peerId,
137137
ex.getMessage
138138
)
@@ -154,24 +154,25 @@ class RLPxConnectionHandler(
154154
}
155155

156156
def handleTimeout: Receive = { case AuthHandshakeTimeout =>
157-
log.debug(s"[Stopping Connection] Auth handshake timeout for peer {}", peerId)
157+
log.debug("[Stopping Connection] Auth handshake timeout for peer {}", peerId)
158158
context.parent ! ConnectionFailed
159159
context stop self
160160
}
161161

162162
def processHandshakeResult(result: AuthHandshakeResult, remainingData: ByteString): Unit =
163163
result match {
164164
case AuthHandshakeSuccess(secrets, remotePubKey) =>
165-
log.debug(s"Auth handshake succeeded for peer {}", peerId)
165+
log.debug("Auth handshake succeeded for peer {}", peerId)
166166
context.parent ! ConnectionEstablished(remotePubKey)
167-
if (remainingData.nonEmpty)
168-
context.self ! Received(remainingData)
169167
// following the specification at https://github.com/ethereum/devp2p/blob/master/rlpx.md#initial-handshake
170168
// point 6 indicates that the next messages needs to be initial 'Hello'
171-
context become awaitInitialHello(extractor(secrets))
169+
// Unfortunately it is hard to figure out the proper order for messages to be handled in.
170+
// FrameCodec assumes that bytes will arrive in the expected order
171+
// To alleviate potential lapses in order each chunk of data needs to be passed to FrameCodec immediately
172+
extractHello(extractor(secrets), remainingData)
172173

173174
case AuthHandshakeError =>
174-
log.debug(s"[Stopping Connection] Auth handshake failed for peer {}", peerId)
175+
log.debug("[Stopping Connection] Auth handshake failed for peer {}", peerId)
175176
context.parent ! ConnectionFailed
176177
context stop self
177178
}
@@ -181,45 +182,37 @@ class RLPxConnectionHandler(
181182
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
182183
seqNumber: Int = 0
183184
): Receive =
184-
handleWriteFailed orElse handleConnectionClosed orElse handleSendHello(
185-
extractor,
186-
cancellableAckTimeout,
187-
seqNumber
188-
) orElse handleReceiveHello(extractor, cancellableAckTimeout, seqNumber)
189-
190-
private def handleSendHello(
191-
extractor: HelloCodec,
192-
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
193-
seqNumber: Int = 0
194-
): Receive = {
195-
// TODO when cancellableAckTimeout is Some
196-
case SendMessage(h: HelloEnc) =>
197-
val out = extractor.writeHello(h)
198-
connection ! Write(out, Ack)
199-
val timeout =
200-
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
201-
context become awaitInitialHello(
202-
extractor,
203-
Some(CancellableAckTimeout(seqNumber, timeout)),
204-
increaseSeqNumber(seqNumber)
205-
)
206-
case Ack if cancellableAckTimeout.nonEmpty =>
207-
//Cancel pending message timeout
208-
cancellableAckTimeout.foreach(_.cancellable.cancel())
209-
context become awaitInitialHello(extractor, None, seqNumber)
210-
211-
case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
212-
cancellableAckTimeout.foreach(_.cancellable.cancel())
213-
log.error(s"[Stopping Connection] Sending 'Hello' to {} failed", peerId)
214-
context stop self
185+
handleWriteFailed orElse handleConnectionClosed orElse {
186+
// TODO when cancellableAckTimeout is Some
187+
case SendMessage(h: HelloEnc) =>
188+
val out = extractor.writeHello(h)
189+
connection ! Write(out, Ack)
190+
val timeout =
191+
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
192+
context become awaitInitialHello(
193+
extractor,
194+
Some(CancellableAckTimeout(seqNumber, timeout)),
195+
increaseSeqNumber(seqNumber)
196+
)
197+
case Ack if cancellableAckTimeout.nonEmpty =>
198+
//Cancel pending message timeout
199+
cancellableAckTimeout.foreach(_.cancellable.cancel())
200+
context become awaitInitialHello(extractor, None, seqNumber)
215201

216-
}
202+
case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
203+
cancellableAckTimeout.foreach(_.cancellable.cancel())
204+
log.error("[Stopping Connection] Sending 'Hello' to {} failed", peerId)
205+
context stop self
206+
case Received(data) =>
207+
extractHello(extractor, data, cancellableAckTimeout, seqNumber)
208+
}
217209

218-
private def handleReceiveHello(
210+
private def extractHello(
219211
extractor: HelloCodec,
212+
data: ByteString,
220213
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
221214
seqNumber: Int = 0
222-
): Receive = { case Received(data) =>
215+
): Unit = {
223216
extractor.readHello(data) match {
224217
case Some((hello, restFrames)) =>
225218
val messageCodecOpt = for {
@@ -236,12 +229,12 @@ class RLPxConnectionHandler(
236229
seqNumber = seqNumber
237230
)
238231
case None =>
239-
log.debug(s"[Stopping Connection] Unable to negotiate protocol with {}", peerId)
232+
log.debug("[Stopping Connection] Unable to negotiate protocol with {}", peerId)
240233
context.parent ! ConnectionFailed
241234
context stop self
242235
}
243236
case None =>
244-
log.debug(s"[Stopping Connection] Did not find 'Hello' in message from {}", peerId)
237+
log.debug("[Stopping Connection] Did not find 'Hello' in message from {}", peerId)
245238
context become awaitInitialHello(extractor, cancellableAckTimeout, seqNumber)
246239
}
247240
}
@@ -262,7 +255,7 @@ class RLPxConnectionHandler(
262255
context.parent ! MessageReceived(message)
263256

264257
case Failure(ex) =>
265-
log.info(s"Cannot decode message from {}, because of {}", peerId, ex.getMessage)
258+
log.info("Cannot decode message from {}, because of {}", peerId, ex.getMessage)
266259
// break connection in case of failed decoding, to avoid attack which would send us garbage
267260
context stop self
268261
}
@@ -310,7 +303,7 @@ class RLPxConnectionHandler(
310303

311304
case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
312305
cancellableAckTimeout.foreach(_.cancellable.cancel())
313-
log.debug(s"[Stopping Connection] Write to {} failed", peerId)
306+
log.debug("[Stopping Connection] Write to {} failed", peerId)
314307
context stop self
315308
}
316309
}
@@ -332,7 +325,7 @@ class RLPxConnectionHandler(
332325
): Unit = {
333326
val out = messageCodec.encodeMessage(messageToSend)
334327
connection ! Write(out, Ack)
335-
log.debug(s"Sent message: {} to {}", messageToSend.underlyingMsg.toShortString, peerId)
328+
log.debug("Sent message: {} to {}", messageToSend.underlyingMsg.toShortString, peerId)
336329

337330
val timeout = system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
338331
context become handshaked(
@@ -356,7 +349,7 @@ class RLPxConnectionHandler(
356349

357350
def handleWriteFailed: Receive = { case CommandFailed(cmd: Write) =>
358351
log.debug(
359-
s"[Stopping Connection] Write to peer {} failed, trying to send {}",
352+
"[Stopping Connection] Write to peer {} failed, trying to send {}",
360353
peerId,
361354
Hex.toHexString(cmd.data.toArray[Byte])
362355
)
@@ -365,10 +358,10 @@ class RLPxConnectionHandler(
365358

366359
def handleConnectionClosed: Receive = { case msg: ConnectionClosed =>
367360
if (msg.isPeerClosed) {
368-
log.debug(s"[Stopping Connection] Connection with {} closed by peer", peerId)
361+
log.debug("[Stopping Connection] Connection with {} closed by peer", peerId)
369362
}
370363
if (msg.isErrorClosed) {
371-
log.debug(s"[Stopping Connection] Connection with {} closed because of error {}", peerId, msg.getErrorCause)
364+
log.debug("[Stopping Connection] Connection with {} closed because of error {}", peerId, msg.getErrorCause)
372365
}
373366

374367
context stop self

src/test/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandlerSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,10 @@ class RLPxConnectionHandlerSpec
229229
.expects(data)
230230
.returning((response, AuthHandshakeSuccess(mock[Secrets], ByteString())))
231231
(mockHelloExtractor.readHello _)
232-
.expects(hello)
232+
.expects(ByteString.empty)
233233
.returning(Some(Hello(5, "", Capabilities.Eth63Capability::Nil, 30303, ByteString("abc")), Seq.empty))
234234
(mockMessageCodec.readMessages _)
235-
.expects(ByteString.empty)
235+
.expects(hello)
236236
.returning(Nil) //For processing of messages after handshaking finishes
237237

238238
rlpxConnection ! Tcp.Received(data)

0 commit comments

Comments
 (0)