@@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult](
49
49
50
50
def scheduler : Scheduler = externalSchedulerOpt getOrElse system.scheduler
51
51
52
- val peerId : PeerId = PeerId (self.path.name)
53
-
54
- val peer : Peer = Peer (peerAddress, self, incomingConnection)
52
+ val peerId : PeerId = PeerId .fromRef(self)
55
53
56
54
override def receive : Receive = waitingForInitialCommand
57
55
@@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult](
87
85
case RLPxConnectionHandler .ConnectionEstablished (remoteNodeId) =>
88
86
val newUri =
89
87
rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri))
90
- processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries)
88
+ processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries)
91
89
92
90
case RLPxConnectionHandler .ConnectionFailed =>
93
91
log.debug(" Failed to establish RLPx connection" )
@@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult](
109
107
110
108
def processingHandshaking (
111
109
handshaker : Handshaker [R ],
110
+ remoteNodeId : ByteString ,
112
111
rlpxConnection : RLPxConnection ,
113
112
timeout : Cancellable ,
114
113
numRetries : Int
@@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult](
122
121
// handles the received message
123
122
handshaker.applyMessage(msg).foreach { newHandshaker =>
124
123
timeout.cancel()
125
- processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
124
+ processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
126
125
}
127
126
handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend))
128
127
129
128
case ResponseTimeout =>
130
129
timeout.cancel()
131
130
val newHandshaker = handshaker.processTimeout
132
- processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
131
+ processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
133
132
134
133
case GetStatus => sender() ! StatusResponse (Handshaking (numRetries))
135
134
@@ -145,18 +144,19 @@ class PeerActor[R <: HandshakeResult](
145
144
*/
146
145
private def processHandshakerNextMessage (
147
146
handshaker : Handshaker [R ],
147
+ remoteNodeId : ByteString ,
148
148
rlpxConnection : RLPxConnection ,
149
149
numRetries : Int
150
150
): Unit =
151
151
handshaker.nextMessage match {
152
152
case Right (NextMessage (msgToSend, timeoutTime)) =>
153
153
rlpxConnection.sendMessage(msgToSend)
154
154
val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout )
155
- context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries)
155
+ context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries)
156
156
157
157
case Left (HandshakeSuccess (handshakeResult)) =>
158
158
rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager .AddKnownNode (uri) }
159
- context become new HandshakedPeer (rlpxConnection, handshakeResult).receive
159
+ context become new HandshakedPeer (remoteNodeId, rlpxConnection, handshakeResult).receive
160
160
unstashAll()
161
161
162
162
case Left (HandshakeFailure (reason)) =>
@@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult](
244
244
stash()
245
245
}
246
246
247
- class HandshakedPeer (rlpxConnection : RLPxConnection , handshakeResult : R ) {
247
+ class HandshakedPeer (remoteNodeId : ByteString , rlpxConnection : RLPxConnection , handshakeResult : R ) {
248
248
249
+ val peer : Peer = Peer (peerAddress, self, incomingConnection, Some (remoteNodeId))
249
250
peerEventBus ! Publish (PeerHandshakeSuccessful (peer, handshakeResult))
250
251
251
252
/**
0 commit comments