Skip to content

Commit afa0c46

Browse files
committed
Merge remote-tracking branch 'origin/develop' into ETCM-446-connection-limit-ranges
2 parents d6a0d76 + e76316d commit afa0c46

22 files changed

+144
-62
lines changed

src/main/resources/application.conf

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ mantis {
22
# Optionally augment the client ID sent in Hello messages.
33
client-identity = null
44

5-
# Version string (reported by an RPC method)
6-
client-version = "mantis/v2.0"
7-
85
# Base directory where all the data used by the node is stored, including blockchain data and private keys
96
datadir = ${user.home}"/.mantis/"${mantis.blockchains.network}
107

@@ -623,7 +620,8 @@ mantis {
623620

624621
akka {
625622
loggers = ["akka.event.slf4j.Slf4jLogger"]
626-
loglevel = "DEBUG"
623+
# Not using ${logging.logs-level} because it might be set to TRACE, which our version of Akka doesn't have.
624+
loglevel = "INFO"
627625
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
628626
logger-startup-timeout = 30s
629627
log-dead-letters = off
@@ -677,4 +675,7 @@ logging {
677675

678676
# Logs filename
679677
logs-file = "mantis"
678+
679+
# Logs level
680+
logs-level = "INFO"
680681
}

src/main/resources/logback.xml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<load key="logging.json-output" as="ASJSON"/>
99
<load key="logging.logs-dir" as="LOGSDIR"/>
1010
<load key="logging.logs-file" as="LOGSFILENAME"/>
11+
<load key="logging.logs-level" as="LOGSLEVEL"/>
1112

1213
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
1314
<encoder>
@@ -43,7 +44,7 @@
4344

4445
<appender name="METRICS" class="io.prometheus.client.logback.InstrumentedAppender" />
4546

46-
<root level="DEBUG">
47+
<root level="${LOGSLEVEL}">
4748
<if condition='p("ASJSON").contains("true")'>
4849
<then>
4950
<appender-ref ref="STASH" />
@@ -58,9 +59,9 @@
5859

5960
<logger name="io.netty" level="WARN"/>
6061
<logger name="io.iohk.scalanet" level="INFO" />
61-
<logger name="io.iohk.ethereum.network.rlpx.RLPxConnectionHandler" level="DEBUG" />
6262
<logger name="io.iohk.ethereum.blockchain.sync.SyncController" level="INFO" />
63-
<logger name="io.iohk.ethereum.network.PeerActor" level="DEBUG" />
63+
<logger name="io.iohk.ethereum.network.PeerActor" level="${LOGSLEVEL}" />
64+
<logger name="io.iohk.ethereum.network.rlpx.RLPxConnectionHandler" level="${LOGSLEVEL}" />
6465
<logger name="io.iohk.ethereum.vm.VM" level="OFF" />
6566

6667
</configuration>

src/main/scala/io/iohk/ethereum/domain/BlockBody.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ case class BlockBody(transactionList: Seq[SignedTransaction], uncleNodesList: Se
77
override def toString: String =
88
s"BlockBody{ transactionList: $transactionList, uncleNodesList: $uncleNodesList }"
99

10+
def toShortString: String =
11+
s"BlockBody { transactionsList: ${transactionList.map(_.hashAsHexString)}, uncleNodesList: ${uncleNodesList.map(_.hashAsHexString)} }"
12+
1013
lazy val numberOfTxs: Int = transactionList.size
1114

1215
lazy val numberOfUncles: Int = uncleNodesList.size

src/main/scala/io/iohk/ethereum/network/PeerActor.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.iohk.ethereum.network.p2p._
1717
import io.iohk.ethereum.network.p2p.messages.WireProtocol._
1818
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
1919
import io.iohk.ethereum.network.rlpx.{AuthHandshaker, RLPxConnectionHandler}
20+
import io.iohk.ethereum.utils.Logger
2021
import org.bouncycastle.util.encoders.Hex
2122

2223
/**
@@ -244,6 +245,23 @@ class PeerActor[R <: HandshakeResult](
244245
stash()
245246
}
246247

248+
// The actor logs incoming messages, which can be quite verbose even for DEBUG mode.
249+
// ActorLogging doesn't support TRACE, but we can push more details if trace is enabled using the normal logging facilites.
250+
object MessageLogger extends Logger {
251+
val isTraceEnabled = {
252+
var enabled = false
253+
log.whenTraceEnabled({ enabled = true })
254+
enabled
255+
}
256+
def logMessage(peerId: PeerId, message: Message): Unit =
257+
// Sometimes potentially seeing the full block in the result is useful.
258+
if (isTraceEnabled) {
259+
log.trace(s"Received message: {} from $peerId", message)
260+
} else {
261+
log.debug(s"Received message: {} from $peerId", message.toShortString)
262+
}
263+
}
264+
247265
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {
248266

249267
val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId))
@@ -258,7 +276,7 @@ class PeerActor[R <: HandshakeResult](
258276
handleTerminated(rlpxConnection, 0, Handshaked) orElse {
259277

260278
case RLPxConnectionHandler.MessageReceived(message) =>
261-
log.debug(s"Received message: {} from $peerId", message)
279+
MessageLogger.logMessage(peerId, message)
262280
peerEventBus ! Publish(MessageFromPeer(message, peer.id))
263281

264282
case DisconnectPeer(reason) =>

src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ class PeerManagerActor(
323323
/** Disconnect some incoming connections so we can free up slots. */
324324
private def pruneIncomingPeers(
325325
connectedPeers: ConnectedPeers,
326-
stats: Map[PeerId, PeerStatisticsActor.Stat]
326+
stats: Map[PeerId, PeerStat]
327327
): ConnectedPeers = {
328328
val pruneCount = PeerManagerActor.numberOfIncomingConnectionsToPrune(connectedPeers, peerConfiguration)
329329
val now = System.currentTimeMillis
@@ -535,7 +535,7 @@ object PeerManagerActor {
535535
/** Assign a priority to peers that we can use to order connections,
536536
* with lower priorities being the ones to prune first.
537537
*/
538-
def prunePriority(stats: Map[PeerId, PeerStatisticsActor.Stat], currentTimeMillis: Long)(peerId: PeerId): Double = {
538+
def prunePriority(stats: Map[PeerId, PeerStat], currentTimeMillis: Long)(peerId: PeerId): Double = {
539539
stats
540540
.get(peerId)
541541
.flatMap { stat =>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.iohk.ethereum.network
2+
3+
import cats._
4+
import cats.implicits._
5+
6+
case class PeerStat(
7+
responsesReceived: Int,
8+
requestsReceived: Int,
9+
firstSeenTimeMillis: Option[Long],
10+
lastSeenTimeMillis: Option[Long]
11+
)
12+
object PeerStat {
13+
val empty: PeerStat = PeerStat(0, 0, None, None)
14+
15+
private def mergeOpt[A, B](x: A, y: A)(f: A => Option[B])(g: (B, B) => B): Option[B] = {
16+
val (mx, my) = (f(x), f(y))
17+
(mx, my).mapN(g) orElse mx orElse my
18+
}
19+
20+
implicit val monoid: Monoid[PeerStat] =
21+
Monoid.instance(
22+
empty,
23+
(a, b) =>
24+
PeerStat(
25+
responsesReceived = a.responsesReceived + b.responsesReceived,
26+
requestsReceived = a.requestsReceived + b.requestsReceived,
27+
firstSeenTimeMillis = mergeOpt(a, b)(_.firstSeenTimeMillis)(math.min),
28+
lastSeenTimeMillis = mergeOpt(a, b)(_.lastSeenTimeMillis)(math.max)
29+
)
30+
)
31+
}

src/main/scala/io/iohk/ethereum/network/PeerStatisticsActor.scala

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,30 @@
11
package io.iohk.ethereum.network
22

33
import akka.actor._
4-
import cats._
5-
import cats.implicits._
64
import io.iohk.ethereum.network.PeerEventBusActor._
75
import io.iohk.ethereum.network.p2p.messages.Codes
86
import java.time.Clock
97
import scala.concurrent.duration.FiniteDuration
10-
import cats.kernel.Monoid
118

129
class PeerStatisticsActor(
1310
peerEventBus: ActorRef,
14-
var maybeStats: Option[TimeSlotStats[PeerId, PeerStatisticsActor.Stat]]
11+
var maybeStats: Option[TimeSlotStats[PeerId, PeerStat]]
1512
) extends Actor {
1613
import PeerStatisticsActor._
1714

18-
// Subscribe to messages received from handshaked peers to maintain stats.
19-
peerEventBus ! Subscribe(MessageSubscriptionClassifier)
20-
// Removing peers is an optimisation to free space, but eventually the stats would be overwritten anyway.
21-
peerEventBus ! Subscribe(SubscriptionClassifier.PeerDisconnectedClassifier(PeerSelector.AllPeers))
15+
override def preStart(): Unit = {
16+
// Subscribe to messages received from handshaked peers to maintain stats.
17+
peerEventBus ! Subscribe(MessageSubscriptionClassifier)
18+
// Removing peers is an optimisation to free space, but eventually the stats would be overwritten anyway.
19+
peerEventBus ! Subscribe(SubscriptionClassifier.PeerDisconnectedClassifier(PeerSelector.AllPeers))
20+
}
2221

2322
def receive: Receive = handlePeerEvents orElse handleStatsRequests
2423

2524
private def handlePeerEvents: Receive = {
2625
case PeerEvent.MessageFromPeer(msg, peerId) =>
2726
val now = System.currentTimeMillis()
28-
val obs = Stat(
27+
val obs = PeerStat(
2928
responsesReceived = if (ResponseCodes(msg.code)) 1 else 0,
3029
requestsReceived = if (RequestCodes(msg.code)) 1 else 0,
3130
firstSeenTimeMillis = Some(now),
@@ -43,49 +42,22 @@ class PeerStatisticsActor(
4342
sender ! StatsForAll(stats)
4443

4544
case GetStatsForPeer(window, peerId) =>
46-
val stats = maybeStats.map(_.get(peerId, Some(window))).getOrElse(Stat.empty)
45+
val stats = maybeStats.map(_.get(peerId, Some(window))).getOrElse(PeerStat.empty)
4746
sender ! StatsForPeer(peerId, stats)
4847
}
4948
}
5049

5150
object PeerStatisticsActor {
52-
case class Stat(
53-
responsesReceived: Int,
54-
requestsReceived: Int,
55-
firstSeenTimeMillis: Option[Long],
56-
lastSeenTimeMillis: Option[Long]
57-
)
58-
object Stat {
59-
val empty: Stat = Stat(0, 0, None, None)
60-
61-
private def mergeOpt[A, B](x: A, y: A)(f: A => Option[B])(g: (B, B) => B): Option[B] = {
62-
val (mx, my) = (f(x), f(y))
63-
(mx, my).mapN(g) orElse mx orElse my
64-
}
65-
66-
implicit val monoid: Monoid[Stat] =
67-
Monoid.instance(
68-
empty,
69-
(a, b) =>
70-
Stat(
71-
responsesReceived = a.responsesReceived + b.responsesReceived,
72-
requestsReceived = a.requestsReceived + b.requestsReceived,
73-
firstSeenTimeMillis = mergeOpt(a, b)(_.firstSeenTimeMillis)(math.min),
74-
lastSeenTimeMillis = mergeOpt(a, b)(_.lastSeenTimeMillis)(math.max)
75-
)
76-
)
77-
}
78-
7951
def props(peerEventBus: ActorRef, slotDuration: FiniteDuration, slotCount: Int): Props =
8052
Props {
8153
implicit val clock = Clock.systemUTC()
82-
new PeerStatisticsActor(peerEventBus, TimeSlotStats[PeerId, Stat](slotDuration, slotCount))
54+
new PeerStatisticsActor(peerEventBus, TimeSlotStats[PeerId, PeerStat](slotDuration, slotCount))
8355
}
8456

8557
case class GetStatsForAll(window: FiniteDuration)
86-
case class StatsForAll(stats: Map[PeerId, Stat])
58+
case class StatsForAll(stats: Map[PeerId, PeerStat])
8759
case class GetStatsForPeer(window: FiniteDuration, peerId: PeerId)
88-
case class StatsForPeer(peerId: PeerId, stat: Stat)
60+
case class StatsForPeer(peerId: PeerId, stat: PeerStat)
8961

9062
val ResponseCodes = Set(
9163
Codes.NewBlockCode,

src/main/scala/io/iohk/ethereum/network/TimeSlotStats.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import cats._
44
import cats.implicits._
55
import java.time.Clock
66
import scala.concurrent.duration.{Duration, FiniteDuration}
7+
import scala.annotation.tailrec
78

89
/** Track statistics over time a fixed size timewindow. */
910
class TimeSlotStats[K, V: Monoid] private (
@@ -59,6 +60,7 @@ class TimeSlotStats[K, V: Monoid] private (
5960
private def fold[A](init: A, window: Duration)(f: (A, Map[K, V]) => A) = {
6061
val (start, end) = slotRange(currentTimeMillis, window)
6162

63+
@tailrec
6264
def loop(idx: Int, acc: List[Map[K, V]]): List[Map[K, V]] = {
6365
val entry = buffer(idx)
6466
if (entry.slotId < start || end < entry.slotId)

src/main/scala/io/iohk/ethereum/network/p2p/Message.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ object Message {
1111

1212
trait Message {
1313
def code: Int
14+
def toShortString: String
1415
}
1516

1617
trait MessageSerializable extends Message {
@@ -21,7 +22,6 @@ trait MessageSerializable extends Message {
2122
def toBytes: Array[Byte]
2223

2324
def underlyingMsg: Message
24-
2525
}
2626

2727
trait MessageDecoder { self =>

src/main/scala/io/iohk/ethereum/network/p2p/MessageSerializableImplicit.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ abstract class MessageSerializableImplicit[T <: Message](val msg: T) extends Mes
1313

1414
override def hashCode(): Int = msg.hashCode()
1515

16+
override def toShortString: String = msg.toShortString
1617
}

src/main/scala/io/iohk/ethereum/network/p2p/messages/CommonMessages.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ object CommonMessages {
6767
s"genesisHash: ${Hex.toHexString(genesisHash.toArray[Byte])}," +
6868
s"}"
6969

70+
override def toShortString: String = toString
7071
override def code: Int = Codes.StatusCode
7172
}
7273

@@ -121,6 +122,13 @@ object CommonMessages {
121122
s"totalDifficulty: $totalDifficulty" +
122123
s"}"
123124

125+
override def toShortString: String =
126+
s"NewBlock { " +
127+
s"code: $code, " +
128+
s"block.header: ${block.header}, " +
129+
s"totalDifficulty: $totalDifficulty" +
130+
s"}"
131+
124132
override def code: Int = Codes.NewBlockCode
125133
}
126134

@@ -193,5 +201,7 @@ object CommonMessages {
193201

194202
case class SignedTransactions(txs: Seq[SignedTransaction]) extends Message {
195203
override def code: Int = Codes.SignedTransactionsCode
204+
override def toShortString: String =
205+
s"SignedTransactions { txs: ${txs.map(_.hashAsHexString)} }"
196206
}
197207
}

src/main/scala/io/iohk/ethereum/network/p2p/messages/PV61.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.iohk.ethereum.network.p2p.{Message, MessageSerializableImplicit}
55
import io.iohk.ethereum.rlp.RLPImplicitConversions._
66
import io.iohk.ethereum.rlp.RLPImplicits._
77
import io.iohk.ethereum.rlp._
8+
import org.bouncycastle.util.encoders.Hex
89

910
object PV61 {
1011

@@ -29,6 +30,8 @@ object PV61 {
2930

3031
case class NewBlockHashes(hashes: Seq[ByteString]) extends Message {
3132
override def code: Int = Codes.NewBlockHashesCode
33+
override def toShortString: String =
34+
s"NewBlockHashes { hashes: ${hashes.map(h => Hex.toHexString(h.toArray[Byte]))} } "
3235
}
3336

3437
object BlockHashesFromNumber {
@@ -51,6 +54,9 @@ object PV61 {
5154

5255
case class BlockHashesFromNumber(number: BigInt, maxBlocks: BigInt) extends Message {
5356
override def code: Int = Codes.BlockHashesFromNumberCode
57+
override def toString: String =
58+
s"BlockHashesFromNumber { number: $number, maxBlocks: $maxBlocks }"
59+
override def toShortString: String = toString
5460
}
5561

5662
}

src/main/scala/io/iohk/ethereum/network/p2p/messages/PV62.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ object PV62 {
6060

6161
case class NewBlockHashes(hashes: Seq[BlockHash]) extends Message {
6262
override def code: Int = Codes.NewBlockHashesCode
63+
override def toString: String =
64+
s"NewBlockHashes { " +
65+
s"hashes: ${hashes} " +
66+
s"}"
67+
override def toShortString: String = toString
6368
}
6469

6570
object GetBlockHeaders {
@@ -102,6 +107,8 @@ object PV62 {
102107
s"skip: $skip " +
103108
s"reverse: $reverse " +
104109
s"}"
110+
111+
override def toShortString: String = toString
105112
}
106113

107114
object BlockBodies {
@@ -123,6 +130,8 @@ object PV62 {
123130

124131
case class BlockBodies(bodies: Seq[BlockBody]) extends Message {
125132
val code: Int = Codes.BlockBodiesCode
133+
override def toShortString: String =
134+
s"BlockBodies { bodies: ${bodies.map(_.toShortString)} }"
126135
}
127136

128137
object BlockHeaders {
@@ -148,6 +157,8 @@ object PV62 {
148157

149158
case class BlockHeaders(headers: Seq[BlockHeader]) extends Message {
150159
override def code: Int = Codes.BlockHeadersCode
160+
override def toShortString: String =
161+
s"BlockHeaders { headers: ${headers.map(_.hashAsHexString)} }"
151162
}
152163

153164
object GetBlockBodies {
@@ -176,5 +187,6 @@ object PV62 {
176187
s"GetBlockBodies { " +
177188
s"hashes: ${hashes.map(h => Hex.toHexString(h.toArray[Byte]))} " +
178189
s"}"
190+
override def toShortString: String = toString
179191
}
180192
}

0 commit comments

Comments
 (0)