Skip to content

Commit b395772

Browse files
committed
[ETCM-841] Consume remaining data when extracting Hello in order
1 parent b51076e commit b395772

File tree

1 file changed

+32
-39
lines changed

1 file changed

+32
-39
lines changed

src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,12 @@ class RLPxConnectionHandler(
164164
case AuthHandshakeSuccess(secrets, remotePubKey) =>
165165
log.debug(s"Auth handshake succeeded for peer {}", peerId)
166166
context.parent ! ConnectionEstablished(remotePubKey)
167-
if (remainingData.nonEmpty)
168-
context.self ! Received(remainingData)
169167
// following the specification at https://github.com/ethereum/devp2p/blob/master/rlpx.md#initial-handshake
170168
// point 6 indicates that the next messages needs to be initial 'Hello'
171-
context become awaitInitialHello(extractor(secrets))
169+
// Unfortunately it is hard to figure out the proper order for messages to be handled in.
170+
// FrameCodec assumes that bytes will arrive in the expected order
171+
// To alleviate potential lapses in order each chunk of data needs to be passed to FrameCodec immediately
172+
extractHello(extractor(secrets), remainingData, None, 0)
172173

173174
case AuthHandshakeError =>
174175
log.debug(s"[Stopping Connection] Auth handshake failed for peer {}", peerId)
@@ -181,45 +182,37 @@ class RLPxConnectionHandler(
181182
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
182183
seqNumber: Int = 0
183184
): Receive =
184-
handleWriteFailed orElse handleConnectionClosed orElse handleSendHello(
185-
extractor,
186-
cancellableAckTimeout,
187-
seqNumber
188-
) orElse handleReceiveHello(extractor, cancellableAckTimeout, seqNumber)
189-
190-
private def handleSendHello(
191-
extractor: HelloCodec,
192-
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
193-
seqNumber: Int = 0
194-
): Receive = {
195-
// TODO when cancellableAckTimeout is Some
196-
case SendMessage(h: HelloEnc) =>
197-
val out = extractor.writeHello(h)
198-
connection ! Write(out, Ack)
199-
val timeout =
200-
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
201-
context become awaitInitialHello(
202-
extractor,
203-
Some(CancellableAckTimeout(seqNumber, timeout)),
204-
increaseSeqNumber(seqNumber)
205-
)
206-
case Ack if cancellableAckTimeout.nonEmpty =>
207-
//Cancel pending message timeout
208-
cancellableAckTimeout.foreach(_.cancellable.cancel())
209-
context become awaitInitialHello(extractor, None, seqNumber)
210-
211-
case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
212-
cancellableAckTimeout.foreach(_.cancellable.cancel())
213-
log.error(s"[Stopping Connection] Sending 'Hello' to {} failed", peerId)
214-
context stop self
185+
handleWriteFailed orElse handleConnectionClosed orElse {
186+
// TODO when cancellableAckTimeout is Some
187+
case SendMessage(h: HelloEnc) =>
188+
val out = extractor.writeHello(h)
189+
connection ! Write(out, Ack)
190+
val timeout =
191+
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
192+
context become awaitInitialHello(
193+
extractor,
194+
Some(CancellableAckTimeout(seqNumber, timeout)),
195+
increaseSeqNumber(seqNumber)
196+
)
197+
case Ack if cancellableAckTimeout.nonEmpty =>
198+
//Cancel pending message timeout
199+
cancellableAckTimeout.foreach(_.cancellable.cancel())
200+
context become awaitInitialHello(extractor, None, seqNumber)
215201

216-
}
202+
case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
203+
cancellableAckTimeout.foreach(_.cancellable.cancel())
204+
log.error(s"[Stopping Connection] Sending 'Hello' to {} failed", peerId)
205+
context stop self
206+
case Received(data) =>
207+
extractHello(extractor, data, cancellableAckTimeout, seqNumber)
208+
}
217209

218-
private def handleReceiveHello(
210+
private def extractHello(
219211
extractor: HelloCodec,
220-
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
221-
seqNumber: Int = 0
222-
): Receive = { case Received(data) =>
212+
data: ByteString,
213+
cancellableAckTimeout: Option[CancellableAckTimeout],
214+
seqNumber: Int
215+
): Unit = {
223216
extractor.readHello(data) match {
224217
case Some((hello, restFrames)) =>
225218
val messageCodecOpt = for {

0 commit comments

Comments
 (0)