Skip to content

Commit 8fcf51a

Browse files
committed
Merge remote-tracking branch 'origin/develop' into etcm-472/missing-fields-receipts
2 parents 2ceeacd + e76316d commit 8fcf51a

27 files changed

+752
-19
lines changed

src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.iohk.ethereum.network.{
3232
KnownNodesManager,
3333
PeerEventBusActor,
3434
PeerManagerActor,
35+
PeerStatisticsActor,
3536
ServerActor
3637
}
3738
import io.iohk.ethereum.nodebuilder.PruningConfigBuilder
@@ -149,6 +150,8 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
149150
override val updateNodesInterval: FiniteDuration = 20.seconds
150151
override val shortBlacklistDuration: FiniteDuration = 1.minute
151152
override val longBlacklistDuration: FiniteDuration = 3.minutes
153+
override val statSlotDuration: FiniteDuration = 1.minute
154+
override val statSlotCount: Int = 30
152155
}
153156

154157
lazy val peerEventBus = system.actorOf(PeerEventBusActor.props, "peer-event-bus")
@@ -167,12 +170,16 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
167170

168171
lazy val authHandshaker: AuthHandshaker = AuthHandshaker(nodeKey, secureRandom)
169172

173+
lazy val peerStatistics =
174+
system.actorOf(PeerStatisticsActor.props(peerEventBus, slotDuration = 1.minute, slotCount = 30))
175+
170176
lazy val peerManager: ActorRef = system.actorOf(
171177
PeerManagerActor.props(
172178
peerDiscoveryManager,
173179
Config.Network.peer,
174180
peerEventBus,
175181
knownNodesManager,
182+
peerStatistics,
176183
handshaker,
177184
authHandshaker,
178185
EthereumMessageDecoder,

src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxy
1818
import io.iohk.ethereum.mpt.MptNode
1919
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
2020
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
21+
import io.iohk.ethereum.network.PeerStatisticsActor
2122
import io.iohk.ethereum.network.discovery.DiscoveryConfig
2223
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
2324
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
@@ -60,6 +61,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
6061
override val updateNodesInterval: FiniteDuration = 20.seconds
6162
override val shortBlacklistDuration: FiniteDuration = 1.minute
6263
override val longBlacklistDuration: FiniteDuration = 3.minutes
64+
override val statSlotDuration: FiniteDuration = 1.minute
65+
override val statSlotCount: Int = 30
6366
}
6467

6568
val actorSystem = ActorSystem("mantis_system")
@@ -91,17 +94,20 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
9194

9295
val peerMessageBus = actorSystem.actorOf(PeerEventBusActor.props)
9396

97+
val peerStatistics = actorSystem.actorOf(PeerStatisticsActor.props(peerMessageBus, 1.minute, 30))
98+
9499
val peerManager = actorSystem.actorOf(
95100
PeerManagerActor.props(
96101
peerDiscoveryManager = actorSystem.deadLetters, // TODO: fixme
97102
peerConfiguration = peerConfig,
98103
peerMessageBus = peerMessageBus,
104+
peerStatistics = peerStatistics,
99105
knownNodesManager = actorSystem.deadLetters, // TODO: fixme
100106
handshaker = handshaker,
101107
authHandshaker = authHandshaker,
102108
messageDecoder = EthereumMessageDecoder,
103-
discoveryConfig,
104-
Config.Network.protocolVersion
109+
discoveryConfig = discoveryConfig,
110+
bestProtocolVersion = Config.Network.protocolVersion
105111
),
106112
"peer-manager"
107113
)

src/main/resources/application.conf

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ mantis {
22
# Optionally augment the client ID sent in Hello messages.
33
client-identity = null
44

5-
# Version string (reported by an RPC method)
6-
client-version = "mantis/v2.0"
7-
85
# Base directory where all the data used by the node is stored, including blockchain data and private keys
96
datadir = ${user.home}"/.mantis/"${mantis.blockchains.network}
107

@@ -163,6 +160,12 @@ mantis {
163160
# Peer which disconnect during tcp connection becouse of other reasons will not be retried for this long duration
164161
# other reasons include: timeout during connection, wrong protocol, incompatible network
165162
long-blacklist-duration = 30.minutes
163+
164+
# Resolution of moving window of peer statistics.
165+
# Will be multiplied by `stat-slot-count` to give the overall length of peer statistics availability.
166+
stat-slot-duration = 1.minute
167+
# How many slots of peer statistics to keep in the moving window, e.g. 60 * 1.minute to keep stats for the last hour with 1 minute resolution.
168+
stat-slot-count = 60
166169
}
167170

168171
rpc {
@@ -605,7 +608,8 @@ mantis {
605608

606609
akka {
607610
loggers = ["akka.event.slf4j.Slf4jLogger"]
608-
loglevel = "DEBUG"
611+
# Not using ${logging.logs-level} because it might be set to TRACE, which our version of Akka doesn't have.
612+
loglevel = "INFO"
609613
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
610614
logger-startup-timeout = 30s
611615
log-dead-letters = off
@@ -659,4 +663,7 @@ logging {
659663

660664
# Logs filename
661665
logs-file = "mantis"
666+
667+
# Logs level
668+
logs-level = "INFO"
662669
}

src/main/resources/logback.xml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<load key="logging.json-output" as="ASJSON"/>
99
<load key="logging.logs-dir" as="LOGSDIR"/>
1010
<load key="logging.logs-file" as="LOGSFILENAME"/>
11+
<load key="logging.logs-level" as="LOGSLEVEL"/>
1112

1213
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
1314
<encoder>
@@ -43,7 +44,7 @@
4344

4445
<appender name="METRICS" class="io.prometheus.client.logback.InstrumentedAppender" />
4546

46-
<root level="DEBUG">
47+
<root level="${LOGSLEVEL}">
4748
<if condition='p("ASJSON").contains("true")'>
4849
<then>
4950
<appender-ref ref="STASH" />
@@ -58,9 +59,9 @@
5859

5960
<logger name="io.netty" level="WARN"/>
6061
<logger name="io.iohk.scalanet" level="INFO" />
61-
<logger name="io.iohk.ethereum.network.rlpx.RLPxConnectionHandler" level="DEBUG" />
6262
<logger name="io.iohk.ethereum.blockchain.sync.SyncController" level="INFO" />
63-
<logger name="io.iohk.ethereum.network.PeerActor" level="DEBUG" />
63+
<logger name="io.iohk.ethereum.network.PeerActor" level="${LOGSLEVEL}" />
64+
<logger name="io.iohk.ethereum.network.rlpx.RLPxConnectionHandler" level="${LOGSLEVEL}" />
6465
<logger name="io.iohk.ethereum.vm.VM" level="OFF" />
6566

6667
</configuration>

src/main/scala/io/iohk/ethereum/domain/BlockBody.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ case class BlockBody(transactionList: Seq[SignedTransaction], uncleNodesList: Se
77
override def toString: String =
88
s"BlockBody{ transactionList: $transactionList, uncleNodesList: $uncleNodesList }"
99

10+
def toShortString: String =
11+
s"BlockBody { transactionsList: ${transactionList.map(_.hashAsHexString)}, uncleNodesList: ${uncleNodesList.map(_.hashAsHexString)} }"
12+
1013
lazy val numberOfTxs: Int = transactionList.size
1114

1215
lazy val numberOfUncles: Int = uncleNodesList.size

src/main/scala/io/iohk/ethereum/network/PeerActor.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.iohk.ethereum.network.p2p._
1717
import io.iohk.ethereum.network.p2p.messages.WireProtocol._
1818
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
1919
import io.iohk.ethereum.network.rlpx.{AuthHandshaker, RLPxConnectionHandler}
20+
import io.iohk.ethereum.utils.Logger
2021
import org.bouncycastle.util.encoders.Hex
2122

2223
/**
@@ -244,6 +245,23 @@ class PeerActor[R <: HandshakeResult](
244245
stash()
245246
}
246247

248+
// The actor logs incoming messages, which can be quite verbose even for DEBUG mode.
249+
// ActorLogging doesn't support TRACE, but we can push more details if trace is enabled using the normal logging facilites.
250+
object MessageLogger extends Logger {
251+
val isTraceEnabled = {
252+
var enabled = false
253+
log.whenTraceEnabled({ enabled = true })
254+
enabled
255+
}
256+
def logMessage(peerId: PeerId, message: Message): Unit =
257+
// Sometimes potentially seeing the full block in the result is useful.
258+
if (isTraceEnabled) {
259+
log.trace(s"Received message: {} from $peerId", message)
260+
} else {
261+
log.debug(s"Received message: {} from $peerId", message.toShortString)
262+
}
263+
}
264+
247265
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {
248266

249267
val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId))
@@ -258,7 +276,7 @@ class PeerActor[R <: HandshakeResult](
258276
handleTerminated(rlpxConnection, 0, Handshaked) orElse {
259277

260278
case RLPxConnectionHandler.MessageReceived(message) =>
261-
log.debug(s"Received message: {} from $peerId", message)
279+
MessageLogger.logMessage(peerId, message)
262280
peerEventBus ! Publish(MessageFromPeer(message, peer.id))
263281

264282
case DisconnectPeer(reason) =>

src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class PeerManagerActor(
3131
peerDiscoveryManager: ActorRef,
3232
peerConfiguration: PeerConfiguration,
3333
knownNodesManager: ActorRef,
34+
peerStatistics: ActorRef,
3435
peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef,
3536
discoveryConfig: DiscoveryConfig,
3637
externalSchedulerOpt: Option[Scheduler] = None
@@ -52,8 +53,6 @@ class PeerManagerActor(
5253
import PeerManagerActor._
5354
import akka.pattern.{ask, pipe}
5455

55-
private type PeerMap = Map[PeerId, Peer]
56-
5756
implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) {
5857
def outgoingConnectionDemand: Int =
5958
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
@@ -339,6 +338,7 @@ object PeerManagerActor {
339338
peerConfiguration: PeerConfiguration,
340339
peerMessageBus: ActorRef,
341340
knownNodesManager: ActorRef,
341+
peerStatistics: ActorRef,
342342
handshaker: Handshaker[R],
343343
authHandshaker: AuthHandshaker,
344344
messageDecoder: MessageDecoder,
@@ -362,6 +362,7 @@ object PeerManagerActor {
362362
peerDiscoveryManager,
363363
peerConfiguration,
364364
knownNodesManager,
365+
peerStatistics,
365366
peerFactory = factory,
366367
discoveryConfig
367368
)
@@ -410,6 +411,8 @@ object PeerManagerActor {
410411
val updateNodesInterval: FiniteDuration
411412
val shortBlacklistDuration: FiniteDuration
412413
val longBlacklistDuration: FiniteDuration
414+
val statSlotDuration: FiniteDuration
415+
val statSlotCount: Int
413416
}
414417

415418
trait FastSyncHostConfiguration {
@@ -444,5 +447,4 @@ object PeerManagerActor {
444447
case class OutgoingConnectionAlreadyHandled(uri: URI) extends ConnectionError
445448

446449
case class PeerAddress(value: String) extends BlackListId
447-
448450
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.iohk.ethereum.network
2+
3+
import cats._
4+
import cats.implicits._
5+
6+
case class PeerStat(
7+
responsesReceived: Int,
8+
requestsReceived: Int,
9+
firstSeenTimeMillis: Option[Long],
10+
lastSeenTimeMillis: Option[Long]
11+
)
12+
object PeerStat {
13+
val empty: PeerStat = PeerStat(0, 0, None, None)
14+
15+
private def mergeOpt[A, B](x: A, y: A)(f: A => Option[B])(g: (B, B) => B): Option[B] = {
16+
val (mx, my) = (f(x), f(y))
17+
(mx, my).mapN(g) orElse mx orElse my
18+
}
19+
20+
implicit val monoid: Monoid[PeerStat] =
21+
Monoid.instance(
22+
empty,
23+
(a, b) =>
24+
PeerStat(
25+
responsesReceived = a.responsesReceived + b.responsesReceived,
26+
requestsReceived = a.requestsReceived + b.requestsReceived,
27+
firstSeenTimeMillis = mergeOpt(a, b)(_.firstSeenTimeMillis)(math.min),
28+
lastSeenTimeMillis = mergeOpt(a, b)(_.lastSeenTimeMillis)(math.max)
29+
)
30+
)
31+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.iohk.ethereum.network
2+
3+
import akka.actor._
4+
import io.iohk.ethereum.network.PeerEventBusActor._
5+
import io.iohk.ethereum.network.p2p.messages.Codes
6+
import java.time.Clock
7+
import scala.concurrent.duration.FiniteDuration
8+
9+
class PeerStatisticsActor(
10+
peerEventBus: ActorRef,
11+
var maybeStats: Option[TimeSlotStats[PeerId, PeerStat]]
12+
) extends Actor {
13+
import PeerStatisticsActor._
14+
15+
override def preStart(): Unit = {
16+
// Subscribe to messages received from handshaked peers to maintain stats.
17+
peerEventBus ! Subscribe(MessageSubscriptionClassifier)
18+
// Removing peers is an optimisation to free space, but eventually the stats would be overwritten anyway.
19+
peerEventBus ! Subscribe(SubscriptionClassifier.PeerDisconnectedClassifier(PeerSelector.AllPeers))
20+
}
21+
22+
def receive: Receive = handlePeerEvents orElse handleStatsRequests
23+
24+
private def handlePeerEvents: Receive = {
25+
case PeerEvent.MessageFromPeer(msg, peerId) =>
26+
val now = System.currentTimeMillis()
27+
val obs = PeerStat(
28+
responsesReceived = if (ResponseCodes(msg.code)) 1 else 0,
29+
requestsReceived = if (RequestCodes(msg.code)) 1 else 0,
30+
firstSeenTimeMillis = Some(now),
31+
lastSeenTimeMillis = Some(now)
32+
)
33+
maybeStats = maybeStats.map(_.add(peerId, obs))
34+
35+
case PeerEvent.PeerDisconnected(peerId) =>
36+
maybeStats = maybeStats.map(_.remove(peerId))
37+
}
38+
39+
private def handleStatsRequests: Receive = {
40+
case GetStatsForAll(window) =>
41+
val stats = maybeStats.map(_.getAll(Some(window))).getOrElse(Map.empty)
42+
sender ! StatsForAll(stats)
43+
44+
case GetStatsForPeer(window, peerId) =>
45+
val stats = maybeStats.map(_.get(peerId, Some(window))).getOrElse(PeerStat.empty)
46+
sender ! StatsForPeer(peerId, stats)
47+
}
48+
}
49+
50+
object PeerStatisticsActor {
51+
def props(peerEventBus: ActorRef, slotDuration: FiniteDuration, slotCount: Int): Props =
52+
Props {
53+
implicit val clock = Clock.systemUTC()
54+
new PeerStatisticsActor(peerEventBus, TimeSlotStats[PeerId, PeerStat](slotDuration, slotCount))
55+
}
56+
57+
case class GetStatsForAll(window: FiniteDuration)
58+
case class StatsForAll(stats: Map[PeerId, PeerStat])
59+
case class GetStatsForPeer(window: FiniteDuration, peerId: PeerId)
60+
case class StatsForPeer(peerId: PeerId, stat: PeerStat)
61+
62+
val ResponseCodes = Set(
63+
Codes.NewBlockCode,
64+
Codes.NewBlockHashesCode,
65+
Codes.SignedTransactionsCode,
66+
Codes.BlockHeadersCode,
67+
Codes.BlockBodiesCode,
68+
Codes.BlockHashesFromNumberCode,
69+
Codes.NodeDataCode,
70+
Codes.ReceiptsCode
71+
)
72+
73+
val RequestCodes = Set(
74+
Codes.GetBlockHeadersCode,
75+
Codes.GetBlockBodiesCode,
76+
Codes.GetNodeDataCode,
77+
Codes.GetReceiptsCode
78+
)
79+
80+
val MessageSubscriptionClassifier =
81+
SubscriptionClassifier.MessageClassifier(
82+
messageCodes = RequestCodes union ResponseCodes,
83+
peerSelector = PeerSelector.AllPeers
84+
)
85+
}

0 commit comments

Comments
 (0)