Skip to content

Commit 7672926

Browse files
authored
Merge pull request #100 from input-output-hk/ETCM-168-discovery-part4
ETCM-168: ETC testnet fixes
2 parents 4d4a3da + 1ab041f commit 7672926

File tree

6 files changed

+58
-32
lines changed

6 files changed

+58
-32
lines changed

scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/EthereumNodeRecord.scala

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ object EthereumNodeRecord {
2222
case class Content(
2323
// Nodes should increment this number whenever their properties change, like their address, and re-publish.
2424
seq: Long,
25+
// Normally clients treat the values as RLP, however we don't have access to the RLP types here, hence it's just bytes.
2526
attrs: SortedMap[ByteVector, ByteVector]
2627
)
2728

@@ -52,6 +53,26 @@ object EthereumNodeRecord {
5253

5354
/** IPv6-specific UDP port, big endian integer */
5455
val udp6 = key("udp6")
56+
57+
/** The keys above have pre-defined meaning, but there can be arbitrary entries in the map. */
58+
val Predefined: Set[ByteVector] = Set(id, secp256k1, ip, tcp, udp, ip6, tcp6, udp6)
59+
}
60+
61+
def apply(signature: Signature, seq: Long, attrs: (ByteVector, ByteVector)*): EthereumNodeRecord =
62+
EthereumNodeRecord(
63+
signature,
64+
EthereumNodeRecord.Content(seq, SortedMap(attrs: _*))
65+
)
66+
67+
def apply(privateKey: PrivateKey, seq: Long, attrs: (ByteVector, ByteVector)*)(
68+
implicit sigalg: SigAlg,
69+
codec: Codec[Content]
70+
): Attempt[EthereumNodeRecord] = {
71+
val content = EthereumNodeRecord.Content(seq, SortedMap(attrs: _*))
72+
codec.encode(content).map { data =>
73+
val sig = sigalg.removeRecoveryId(sigalg.sign(privateKey, data))
74+
EthereumNodeRecord(sig, content)
75+
}
5576
}
5677

5778
def fromNode(node: Node, privateKey: PrivateKey, seq: Long)(
@@ -64,20 +85,15 @@ object EthereumNodeRecord {
6485
else
6586
(Keys.ip, Keys.tcp, Keys.udp)
6687

67-
val content = Content(
68-
seq,
69-
SortedMap(
70-
Keys.id -> ByteVector("v4".getBytes(StandardCharsets.UTF_8)),
71-
Keys.secp256k1 -> sigalg.compressPublicKey(sigalg.toPublicKey(privateKey)).toByteVector,
72-
ipKey -> ByteVector(node.address.ip.getAddress),
73-
tcpKey -> ByteVector.fromInt(node.address.tcpPort),
74-
udpKey -> ByteVector.fromInt(node.address.udpPort)
75-
)
88+
val attrs = List(
89+
Keys.id -> ByteVector("v4".getBytes(StandardCharsets.UTF_8)),
90+
Keys.secp256k1 -> sigalg.compressPublicKey(sigalg.toPublicKey(privateKey)).toByteVector,
91+
ipKey -> ByteVector(node.address.ip.getAddress),
92+
tcpKey -> ByteVector.fromInt(node.address.tcpPort),
93+
udpKey -> ByteVector.fromInt(node.address.udpPort)
7694
)
77-
codec.encode(content).map { data =>
78-
val sig = sigalg.removeRecoveryId(sigalg.sign(privateKey, data))
79-
EthereumNodeRecord(sig, content)
80-
}
95+
96+
apply(privateKey, seq, attrs: _*)
8197
}
8298

8399
def validateSignature(

scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/Node.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ object Node {
4242
tryParse[InetAddress](key)(bytes => InetAddress.getByAddress(bytes.toArray))
4343

4444
def tryParsePort(key: ByteVector): Option[Int] =
45-
tryParse[Int](key)(bytes => bytes.toInt())
45+
tryParse[Int](key)(bytes => bytes.toInt(signed = false))
4646

4747
for {
4848
ip <- tryParseIP(Keys.ip6) orElse tryParseIP(Keys.ip)

scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetwork.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.util.control.{NonFatal, NoStackTrace}
2121
import scodec.bits.BitVector
2222
import io.iohk.scalanet.discovery.ethereum.v4.Payload.Neighbors
2323
import java.net.InetSocketAddress
24+
import io.iohk.scalanet.discovery.hash.Keccak256
2425

2526
/** Present a stateless facade implementing the RPC methods
2627
* that correspond to the discovery protocol messages on top
@@ -68,9 +69,9 @@ object DiscoveryNetwork {
6869
import DiscoveryRPC.ENRSeq
6970
import Payload._
7071

71-
private val expirationMillis = config.messageExpiration.toMillis
72-
private val maxClockDriftMillis = config.maxClockDrift.toMillis
73-
private val currentTimeMillis = clock.realTime(MILLISECONDS)
72+
private val expirationSeconds = config.messageExpiration.toSeconds
73+
private val maxClockDriftSeconds = config.maxClockDrift.toSeconds
74+
private val currentTimeSeconds = clock.realTime(SECONDS)
7475

7576
private val maxNeighborsPerPacket = getMaxNeighborsPerPacket
7677

@@ -117,7 +118,7 @@ object DiscoveryNetwork {
117118
.toIterant
118119
.mapEval {
119120
case MessageReceived(packet) =>
120-
currentTimeMillis.flatMap { timestamp =>
121+
currentTimeSeconds.flatMap { timestamp =>
121122
Packet.unpack(packet) match {
122123
case Attempt.Successful((payload, remotePublicKey)) =>
123124
payload match {
@@ -126,7 +127,7 @@ object DiscoveryNetwork {
126127
Task.unit
127128

128129
case p: Payload.HasExpiration[_] if isExpired(p, timestamp) =>
129-
Task(logger.debug(s"Ignoring expired request from ${channel.to}"))
130+
Task(logger.debug(s"Ignoring expired request from ${channel.to}; ${p.expiration} < $timestamp"))
130131

131132
case p: Payload.Request =>
132133
handleRequest(handler, channel, remotePublicKey, packet.hash, p)
@@ -214,7 +215,7 @@ object DiscoveryNetwork {
214215
private def setExpiration(payload: Payload): Task[Payload] = {
215216
payload match {
216217
case p: Payload.HasExpiration[_] =>
217-
currentTimeMillis.map(t => p.withExpiration(t + expirationMillis))
218+
currentTimeSeconds.map(t => p.withExpiration(t + expirationSeconds))
218219
case p =>
219220
Task.pure(p)
220221
}
@@ -230,7 +231,7 @@ object DiscoveryNetwork {
230231
* our expiration time to 1 hour wouldn't help in this case.
231232
*/
232233
private def isExpired(payload: HasExpiration[_], now: Long): Boolean =
233-
payload.expiration < now - maxClockDriftMillis
234+
payload.expiration < now - maxClockDriftSeconds
234235

235236
/** Ping a peer. */
236237
override val ping = (peer: Peer[A]) =>
@@ -241,8 +242,14 @@ object DiscoveryNetwork {
241242
Ping(version = 4, from = localNodeAddress, to = toNodeAddress(peer.address), 0, localEnrSeq)
242243
)
243244
.flatMap { packet =>
245+
// Workaround for 1.10 Parity nodes that send back the hash of the Ping data
246+
// rather than the hash of the whole packet (over signature + data).
247+
// https://github.com/paritytech/parity/issues/8038
248+
// https://github.com/ethereumproject/go-ethereum/issues/312
249+
val dataHash = Keccak256(packet.data)
250+
244251
channel.collectFirstResponse(peer.id) {
245-
case Pong(_, pingHash, _, maybeRemoteEnrSeq) if pingHash == packet.hash =>
252+
case Pong(_, pingHash, _, maybeRemoteEnrSeq) if pingHash == packet.hash || pingHash == dataHash =>
246253
maybeRemoteEnrSeq
247254
}
248255
}
@@ -317,7 +324,7 @@ object DiscoveryNetwork {
317324
case MessageReceived(packet) => packet
318325
}
319326
.mapEval { packet =>
320-
currentTimeMillis.flatMap { timestamp =>
327+
currentTimeSeconds.flatMap { timestamp =>
321328
Packet.unpack(packet) match {
322329
case Attempt.Successful((payload, remotePublicKey)) =>
323330
payload match {
@@ -329,7 +336,9 @@ object DiscoveryNetwork {
329336
Task.pure(None)
330337

331338
case p: Payload.HasExpiration[_] if isExpired(p, timestamp) =>
332-
Task(logger.debug(s"Ignoring expired response from ${channel.to}")).as(None)
339+
Task(
340+
logger.debug(s"Ignoring expired response from ${channel.to}; ${p.expiration} < $timestamp")
341+
).as(None)
333342

334343
case p: Payload.Response =>
335344
Task.pure(Some(p))

scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ object DiscoveryService {
815815
val tryEnroll = for {
816816
nodeId <- stateRef.get.map(_.node.id)
817817
bootstrapPeers = config.knownPeers.toList.map(toPeer).filterNot(_.id == nodeId)
818-
maybeBootstrapEnrs <- bootstrapPeers.traverse(fetchEnr(_, delay = true))
818+
maybeBootstrapEnrs <- Task.parTraverseN(config.kademliaAlpha)(bootstrapPeers)(fetchEnr(_, delay = true))
819819
result <- if (maybeBootstrapEnrs.exists(_.isDefined)) {
820820
lookup(nodeId).as(true)
821821
} else {

scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/Payload.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object Payload {
1818
sealed trait Response extends Payload
1919

2020
trait HasExpiration[T <: Payload] {
21-
// Absolute UNIX timestamp.
21+
// Absolute UNIX timestamp: seconds since epoch.
2222
def expiration: Long
2323
def withExpiration(at: Long): T
2424
}
@@ -28,7 +28,6 @@ object Payload {
2828
version: Int,
2929
from: Node.Address,
3030
to: Node.Address,
31-
// Absolute UNIX timestamp.
3231
expiration: Long,
3332
// Current ENR sequence number of the sender.
3433
enrSeq: Option[Long]

scalanet/discovery/test/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetworkSpec.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {
644644
_ <- network.startHandling(handleWithSome)
645645
channel <- peerGroup.createServerChannel(from = remoteAddress)
646646
(request: Payload) = requestMap(rpc) match {
647-
case p: Payload.HasExpiration[_] => p.withExpiration(System.currentTimeMillis - 5000)
647+
case p: Payload.HasExpiration[_] => p.withExpiration(currentTimeSeconds - 5)
648648
case p => p
649649
}
650650
_ <- channel.sendPayloadToSUT(request, remotePrivateKey)
@@ -746,7 +746,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {
746746
}
747747

748748
def packetSizeOfNNeighbors(n: Int) = {
749-
val neighbours = Neighbors(List.fill(n)(randomIPv6Node), System.currentTimeMillis)
749+
val neighbours = Neighbors(List.fill(n)(randomIPv6Node), expiration = currentTimeSeconds)
750750
val (_, privateKey) = sigalg.newKeyPair
751751
val packet = Packet.pack(neighbours, privateKey).require
752752
val packetSize = packet.hash.size + packet.signature.size + packet.data.size
@@ -789,6 +789,8 @@ object DiscoveryNetworkSpec extends Matchers {
789789
maxClockDrift = Duration.Zero
790790
)
791791

792+
def currentTimeSeconds = System.currentTimeMillis / 1000
793+
792794
trait Fixture {
793795
// Implement `test` to assert something.
794796
def test: Task[Assertion]
@@ -843,14 +845,14 @@ object DiscoveryNetworkSpec extends Matchers {
843845
).runSyncUnsafe()
844846

845847
def assertExpirationSet(expiration: Long) =
846-
expiration shouldBe (System.currentTimeMillis + config.messageExpiration.toMillis) +- 3000
848+
expiration shouldBe (currentTimeSeconds + config.messageExpiration.toSeconds) +- 3
847849

848850
def validExpiration =
849-
System.currentTimeMillis + config.messageExpiration.toMillis
851+
currentTimeSeconds + config.messageExpiration.toSeconds
850852

851853
// Anything in the past is invalid.
852854
def invalidExpiration =
853-
System.currentTimeMillis - 1
855+
currentTimeSeconds - 1
854856

855857
implicit class ChannelOps(channel: MockChannel[InetSocketAddress, Packet]) {
856858
def sendPayloadToSUT(

0 commit comments

Comments
 (0)