Skip to content

[ETCM-573] Add metrics on block imports #919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13,981 changes: 7,052 additions & 6,929 deletions docker/mantis/grafana/provisioning/dashboards/mantis-dashboard.json

Large diffs are not rendered by default.

51 changes: 48 additions & 3 deletions src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.iohk.ethereum.sync

import com.typesafe.config.ConfigValueFactory
import io.iohk.ethereum.FreeSpecBase
import io.iohk.ethereum.metrics.{Metrics, MetricsConfig}
import io.iohk.ethereum.sync.util.RegularSyncItSpecUtils.FakePeer
import io.iohk.ethereum.sync.util.SyncCommonItSpec._
import io.iohk.ethereum.utils.Config
import io.prometheus.client.CollectorRegistry
import monix.execution.Scheduler
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
Expand All @@ -12,6 +16,12 @@ import scala.concurrent.duration._
class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAll {
implicit val testScheduler = Scheduler.fixedPool("test", 16)

override def beforeAll(): Unit = {
Metrics.configure(
MetricsConfig(Config.config.withValue("metrics.enabled", ConfigValueFactory.fromAnyRef(true)))
)
}

override def afterAll(): Unit = {
testScheduler.shutdown()
testScheduler.awaitTermination(120.second)
Expand All @@ -20,12 +30,12 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
"peer 2 should sync to the top of peer1 blockchain" - {
"given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
val blockNumer: Int = 2000
val blockNumber: Int = 2000
for {
_ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer1.importBlocksUntil(blockNumber)(IdentityUpdate)
_ <- peer2.startRegularSync()
_ <- peer2.connectToPeers(Set(peer1.node))
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer)
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumber)
} yield {
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
}
Expand Down Expand Up @@ -96,4 +106,39 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
}
}

"A metric about mining a new block should be available" in customTestCaseResourceM(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice that you tested the metric 👍 (Though this was perhaps also out of necessity?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was. I need to understand better what was happening. I started by modifying an existing test and then created this one that is a bit more specific. I also found a bit weird that there are no tests on metrics at all

FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
import MantisRegistries._

val minedMetricBefore = sampleMetric(TimerCountMetric, MinedBlockPropagation)
val defaultMetricBefore = sampleMetric(TimerCountMetric, DefaultBlockPropagation)

for {
_ <- peer1.startRegularSync()
_ <- peer1.mineNewBlocks(10.milliseconds, 1)(IdentityUpdate)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it may be nice to create a value class for number of blocks:

case class NumBlocks(val underlying: Int) extends AnyVal

This would help make invocations like this a little clearer (at present, the only way to figure out way this 1 signifies is to track down the definition of mineNewBlocks), adds a bit of documentation and helps avoid mixups between different kinds of Ints.

Of course there is a tiny bit of extra code for this and whether it's worth paying that cost depends on how central the concept is in the codebase. Just using a regular Int probably isn't a big deal in tests like this.

(Same thinking applies to block number in waitForRegularSyncLoadLastBlock.)

Image of Eric E Eric E

_ <- peer1.waitForRegularSyncLoadLastBlock(1)
_ <- peer2.startRegularSync()
_ <- peer2.connectToPeers(Set(peer1.node))
_ <- peer2.waitForRegularSyncLoadLastBlock(1)
} yield {

val minedMetricAfter = sampleMetric(TimerCountMetric, MinedBlockPropagation).doubleValue()
val defaultMetricAfter = sampleMetric(TimerCountMetric, DefaultBlockPropagation).doubleValue()

minedMetricAfter shouldBe minedMetricBefore + 1.0d
defaultMetricAfter shouldBe defaultMetricBefore + 1.0d
}
}

object MantisRegistries {
val TimerCountMetric = "app_regularsync_blocks_propagation_timer_seconds_count"
val DefaultBlockPropagation = "DefaultBlockPropagation"
val MinedBlockPropagation = "MinedBlockPropagation"
def sampleMetric(metricName: String, blockType: String): Double = CollectorRegistry.defaultRegistry.getSampleValue(
metricName,
Array("blocktype"),
Array(blockType)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ object RegularSyncItSpecUtils {
}
}

def waitForRegularSyncLoadLastBlock(blockNumer: BigInt): Task[Boolean] = {
retryUntilWithDelay(Task(bl.getBestBlockNumber() == blockNumer), 1.second, 90) { isDone => isDone }
def waitForRegularSyncLoadLastBlock(blockNumber: BigInt): Task[Boolean] = {
retryUntilWithDelay(Task(bl.getBestBlockNumber() == blockNumber), 1.second, 90) { isDone => isDone }
}

def mineNewBlock(
Expand All @@ -116,9 +116,9 @@ object RegularSyncItSpecUtils {
val currentWeight = bl
.getChainWeightByHash(block.hash)
.getOrElse(throw new RuntimeException(s"ChainWeight by hash: ${block.hash} doesn't exist"))
val currentWolrd = getMptForBlock(block)
val currentWorld = getMptForBlock(block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

val (newBlock, _, _) =
createChildBlock(block, currentWeight, currentWolrd, plusDifficulty)(updateWorldForBlock)
createChildBlock(block, currentWeight, currentWorld, plusDifficulty)(updateWorldForBlock)
regularSync ! SyncProtocol.MinedBlock(newBlock)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class FastSync(

case ResponseReceived(peer, BlockHeaders(blockHeaders), timeTaken) =>
log.info("*** Received {} block headers in {} ms ***", blockHeaders.size, timeTaken)
SyncMetrics.setBlockHeadersDownloadTime(timeTaken)
FastSyncMetrics.setBlockHeadersDownloadTime(timeTaken)

requestedHeaders.get(peer).foreach { requestedNum =>
removeRequestHandler(sender())
Expand All @@ -190,7 +190,7 @@ class FastSync(

case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) =>
log.info("Received {} block bodies in {} ms", blockBodies.size, timeTaken)
SyncMetrics.setBlockBodiesDownloadTime(timeTaken)
FastSyncMetrics.setBlockBodiesDownloadTime(timeTaken)

val requestedBodies = requestedBlockBodies.getOrElse(sender(), Nil)
requestedBlockBodies -= sender()
Expand All @@ -199,7 +199,7 @@ class FastSync(

case ResponseReceived(peer, Receipts(receipts), timeTaken) =>
log.info("Received {} receipts in {} ms", receipts.size, timeTaken)
SyncMetrics.setBlockReceiptsDownloadTime(timeTaken)
FastSyncMetrics.setBlockReceiptsDownloadTime(timeTaken)

val requestedHashes = requestedReceipts.getOrElse(sender(), Nil)
requestedReceipts -= sender()
Expand Down Expand Up @@ -656,7 +656,7 @@ class FastSync(
}

def processSyncing(): Unit = {
SyncMetrics.measure(syncState)
FastSyncMetrics.measure(syncState)
if (fullySynced) {
finish()
} else {
Expand All @@ -676,7 +676,7 @@ class FastSync(

def finish(): Unit = {
val totalTime = totalMinutesTaken()
SyncMetrics.setFastSyncTotalTimeGauge(totalTime.toDouble)
FastSyncMetrics.setFastSyncTotalTimeGauge(totalTime.toDouble)
log.info("Total time taken for FastSync was {} minutes", totalTime)
log.info("Block synchronization in fast mode finished, switching to regular mode")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.iohk.ethereum.blockchain.sync.fast

import java.util.concurrent.atomic.AtomicLong
import com.google.common.util.concurrent.AtomicDouble
import io.iohk.ethereum.blockchain.sync.fast.FastSync.SyncState
import io.iohk.ethereum.metrics.MetricsContainer

import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration.MILLISECONDS

object SyncMetrics extends MetricsContainer {
object FastSyncMetrics extends MetricsContainer {

private final val PivotBlockNumberGauge =
metrics.registry.gauge("fastsync.block.pivotBlock.number.gauge", new AtomicDouble(0d))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import monix.eval.Task
import monix.execution.Scheduler

import scala.concurrent.duration._

class SyncStateSchedulerActor(
Expand Down Expand Up @@ -64,7 +65,7 @@ class SyncStateSchedulerActor(
def handleRequestResults: Receive = {
case ResponseReceived(peer, nodeData: NodeData, timeTaken) =>
log.info("Received {} state nodes in {} ms", nodeData.values.size, timeTaken)
SyncMetrics.setMptStateDownloadTime(timeTaken)
FastSyncMetrics.setMptStateDownloadTime(timeTaken)

context unwatch (sender())
self ! RequestData(nodeData, peer)
Expand Down
Loading