Skip to content

Commit aafb637

Browse files
author
Aurélien Richez
authored
[ETCM-829] rewrite healthcheck (#984)
* [ETCM-829] rewrite healthcheck : - adds predicate to existing health checks - Adds a way to set additional info in the healthcheck - Adds updateStatus - Check if the server seems stuck for too long - Adds syncStatus
1 parent b81d183 commit aafb637

File tree

10 files changed

+253
-80
lines changed

10 files changed

+253
-80
lines changed

src/main/resources/conf/base.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,15 @@ mantis {
253253
peer-manager-timeout = 5.seconds
254254
}
255255

256+
health {
257+
# If the best known block number stays the same for more time than this,
258+
# the healthcheck will consider the client to be stuck and return an error
259+
no-update-duration-threshold = 30.minutes
260+
# If the difference between the best stored block number and the best known block number
261+
# is less than this value, the healthcheck will report that the client is synced.
262+
syncing-status-threshold = 10
263+
}
264+
256265
miner-active-timeout = 5.seconds
257266
}
258267
}

src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetJsonRpcHealthCheck.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ class FaucetJsonRpcHealthCheck(faucetRpcService: FaucetRpcService) extends JsonR
99

1010
protected def mainService: String = "faucet health"
1111

12-
final val statusHC = JsonRpcHealthcheck("status", faucetRpcService.status(StatusRequest()))
12+
final val statusHC = JsonRpcHealthcheck.fromServiceResponse("status", faucetRpcService.status(StatusRequest()))
1313

1414
override def healthCheck(): Task[HealthcheckResponse] = {
15-
val statusF = statusHC()
15+
val statusF = statusHC.map(_.toResult)
1616
val responseF = statusF.map(check => HealthcheckResponse(List(check)))
1717

1818
handleResponse(responseF)

src/main/scala/io/iohk/ethereum/healthcheck/Healthcheck.scala

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
package io.iohk.ethereum.healthcheck
22

3-
final case class HealthcheckResult private (description: String, status: String, error: Option[String]) {
4-
assert(
5-
status == HealthcheckStatus.OK && error.isEmpty || status == HealthcheckStatus.ERROR && error.isDefined
6-
)
3+
final case class HealthcheckResult private (
4+
name: String,
5+
status: String,
6+
info: Option[String]
7+
) {
78

89
def isOK: Boolean = status == HealthcheckStatus.OK
910
}
1011

1112
object HealthcheckResult {
12-
def apply(description: String, error: Option[String]): HealthcheckResult =
13+
14+
def ok(name: String, info: Option[String] = None): HealthcheckResult =
15+
new HealthcheckResult(
16+
name = name,
17+
status = HealthcheckStatus.OK,
18+
info = info
19+
)
20+
21+
def error(name: String, error: String): HealthcheckResult =
1322
new HealthcheckResult(
14-
description = description,
15-
status = error.fold(HealthcheckStatus.OK)(_ => HealthcheckStatus.ERROR),
16-
error = error
23+
name = name,
24+
status = HealthcheckStatus.ERROR,
25+
info = Some(error)
1726
)
1827
}
Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,51 @@
11
package io.iohk.ethereum.jsonrpc
22

3-
import io.iohk.ethereum.healthcheck.Healthcheck
3+
import io.iohk.ethereum.healthcheck.HealthcheckResult
4+
import monix.eval.Task
5+
6+
final case class JsonRpcHealthcheck[Response](
7+
name: String,
8+
healthCheck: Either[String, Response],
9+
info: Option[String] = None
10+
) {
11+
12+
def toResult: HealthcheckResult = {
13+
healthCheck
14+
.fold(
15+
HealthcheckResult.error(name, _),
16+
result => HealthcheckResult.ok(name, info)
17+
)
18+
}
19+
20+
def withPredicate(message: String)(predicate: Response => Boolean): JsonRpcHealthcheck[Response] =
21+
copy(healthCheck = healthCheck.filterOrElse(predicate, message))
22+
23+
def collect[T](message: String)(collectFn: PartialFunction[Response, T]): JsonRpcHealthcheck[T] =
24+
copy(
25+
name = name,
26+
healthCheck = healthCheck.flatMap(collectFn.lift(_).toRight(message))
27+
)
28+
29+
def withInfo(getInfo: Response => String): JsonRpcHealthcheck[Response] =
30+
copy(info = healthCheck.toOption.map(getInfo))
31+
}
432

533
object JsonRpcHealthcheck {
6-
type T[R] = Healthcheck[JsonRpcError, R]
734

8-
def apply[R](description: String, f: ServiceResponse[R]): T[R] = Healthcheck(description, f)
35+
def fromServiceResponse[Response](name: String, f: ServiceResponse[Response]): Task[JsonRpcHealthcheck[Response]] =
36+
f.map(result =>
37+
JsonRpcHealthcheck(
38+
name,
39+
result.left.map[String](_.message)
40+
)
41+
).onErrorHandle(t => JsonRpcHealthcheck(name, Left(t.getMessage())))
42+
43+
def fromTask[Response](name: String, f: Task[Response]): Task[JsonRpcHealthcheck[Response]] =
44+
f.map(result =>
45+
JsonRpcHealthcheck(
46+
name,
47+
Right(result)
48+
)
49+
).onErrorHandle(t => JsonRpcHealthcheck(name, Left(t.getMessage())))
50+
951
}
Lines changed: 131 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,150 @@
11
package io.iohk.ethereum.jsonrpc
22

33
import io.iohk.ethereum.healthcheck.HealthcheckResponse
4-
import io.iohk.ethereum.jsonrpc.EthBlocksService.BlockByNumberRequest
4+
import io.iohk.ethereum.jsonrpc.EthBlocksService.{
5+
BlockByNumberRequest,
6+
BlockByNumberResponse,
7+
BestBlockNumberRequest,
8+
BestBlockNumberResponse
9+
}
510
import io.iohk.ethereum.jsonrpc.EthInfoService._
11+
import io.iohk.ethereum.jsonrpc.NodeJsonRpcHealthChecker.JsonRpcHealthConfig
612
import io.iohk.ethereum.jsonrpc.NetService._
13+
import io.iohk.ethereum.jsonrpc.AkkaTaskOps._
14+
import com.typesafe.config.{Config => TypesafeConfig}
715
import monix.eval.Task
16+
import java.time.Instant
17+
import java.time.Duration
18+
import akka.actor.ActorRef
19+
import io.iohk.ethereum.blockchain.sync.SyncProtocol
20+
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status._
21+
import akka.util.Timeout
22+
import io.iohk.ethereum.utils.AsyncConfig
823

924
class NodeJsonRpcHealthChecker(
1025
netService: NetService,
11-
ethBlocksService: EthBlocksService
26+
ethBlocksService: EthBlocksService,
27+
syncingController: ActorRef,
28+
config: JsonRpcHealthConfig,
29+
asyncConfig: AsyncConfig
1230
) extends JsonRpcHealthChecker {
1331

32+
implicit val askTimeout: Timeout = asyncConfig.askTimeout
33+
1434
protected def mainService: String = "node health"
1535

16-
final val listeningHC = JsonRpcHealthcheck("listening", netService.listening(NetService.ListeningRequest()))
17-
final val peerCountHC = JsonRpcHealthcheck("peerCount", netService.peerCount(PeerCountRequest()))
18-
final val earliestBlockHC = JsonRpcHealthcheck(
19-
"earliestBlock",
20-
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Earliest, fullTxs = true))
21-
)
22-
final val latestBlockHC = JsonRpcHealthcheck(
23-
"latestBlock",
24-
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Latest, fullTxs = true))
25-
)
26-
final val pendingBlockHC = JsonRpcHealthcheck(
27-
"pendingBlock",
28-
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Pending, fullTxs = true))
29-
)
36+
private var previousBestFetchingBlock: Option[(Instant, BigInt)] = None
37+
38+
private val peerCountHC = JsonRpcHealthcheck
39+
.fromServiceResponse("peerCount", netService.peerCount(PeerCountRequest()))
40+
.map(
41+
_.withInfo(_.value.toString)
42+
.withPredicate("peer count is 0")(_.value > 0)
43+
)
44+
45+
private val storedBlockHC = JsonRpcHealthcheck
46+
.fromServiceResponse(
47+
"bestStoredBlock",
48+
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Latest, fullTxs = true))
49+
)
50+
.map(
51+
_.collect("No block is currently stored") { case EthBlocksService.BlockByNumberResponse(Some(v)) => v }
52+
.withInfo(_.number.toString)
53+
)
54+
55+
private val bestKnownBlockHC = JsonRpcHealthcheck
56+
.fromServiceResponse("bestKnownBlock", getBestKnownBlockTask)
57+
.map(_.withInfo(_.toString))
58+
59+
private val fetchingBlockHC = JsonRpcHealthcheck
60+
.fromServiceResponse("bestFetchingBlock", getBestFetchingBlockTask)
61+
.map(
62+
_.collect("no best fetching block") { case Some(v) => v }
63+
.withInfo(_.toString)
64+
)
65+
66+
private val updateStatusHC = JsonRpcHealthcheck
67+
.fromServiceResponse("updateStatus", getBestFetchingBlockTask)
68+
.map(
69+
_.collect("no best fetching block") { case Some(v) => v }
70+
.withPredicate(s"block did not change for more than ${config.noUpdateDurationThreshold.getSeconds()} s")(
71+
blockNumberHasChanged
72+
)
73+
)
74+
75+
private val syncStatusHC =
76+
JsonRpcHealthcheck
77+
.fromTask("syncStatus", syncingController.askFor[SyncProtocol.Status](SyncProtocol.GetStatus))
78+
.map(_.withInfo {
79+
case NotSyncing => "STARTING"
80+
case s: Syncing if isConsideredSyncing(s.blocksProgress) => "SYNCING"
81+
case _ => "SYNCED"
82+
})
3083

3184
override def healthCheck(): Task[HealthcheckResponse] = {
32-
val listeningF = listeningHC()
33-
val peerCountF = peerCountHC()
34-
val earliestBlockF = earliestBlockHC()
35-
val latestBlockF = latestBlockHC()
36-
val pendingBlockF = pendingBlockHC()
85+
val responseTask = Task
86+
.parSequence(
87+
List(
88+
peerCountHC,
89+
storedBlockHC,
90+
bestKnownBlockHC,
91+
fetchingBlockHC,
92+
updateStatusHC,
93+
syncStatusHC
94+
)
95+
)
96+
.map(_.map(_.toResult))
97+
.map(HealthcheckResponse)
98+
99+
handleResponse(responseTask)
100+
}
101+
102+
private def blockNumberHasChanged(newBestFetchingBlock: BigInt) =
103+
previousBestFetchingBlock match {
104+
case Some((firstSeenAt, value)) if value == newBestFetchingBlock =>
105+
Instant.now().minus(config.noUpdateDurationThreshold).isBefore(firstSeenAt)
106+
case _ =>
107+
previousBestFetchingBlock = Some((Instant.now(), newBestFetchingBlock))
108+
true
109+
}
110+
111+
/** Try to fetch best block number from the sync controller or fallback to ethBlocksService */
112+
private def getBestKnownBlockTask =
113+
syncingController
114+
.askFor[SyncProtocol.Status](SyncProtocol.GetStatus)
115+
.flatMap {
116+
case NotSyncing | SyncDone =>
117+
ethBlocksService
118+
.bestBlockNumber(EthBlocksService.BestBlockNumberRequest())
119+
.map(_.map(_.bestBlockNumber))
120+
case Syncing(_, progress, _) => Task.now(Right(progress.target))
121+
}
122+
123+
/** Try to fetch best fetching number from the sync controller or fallback to ethBlocksService */
124+
private def getBestFetchingBlockTask =
125+
syncingController
126+
.askFor[SyncProtocol.Status](SyncProtocol.GetStatus)
127+
.flatMap {
128+
case NotSyncing | SyncDone =>
129+
ethBlocksService
130+
.getBlockByNumber(BlockByNumberRequest(BlockParam.Pending, fullTxs = true))
131+
.map(_.map(_.blockResponse.map(_.number)))
132+
case Syncing(_, progress, _) => Task.now(Right(Some(progress.current)))
133+
}
134+
135+
private def isConsideredSyncing(progress: Progress) =
136+
progress.target - progress.current > config.syncingStatusThreshold
137+
138+
}
37139

38-
val allChecksF = List(listeningF, peerCountF, earliestBlockF, latestBlockF, pendingBlockF)
39-
val responseF = Task.sequence(allChecksF).map(HealthcheckResponse)
140+
object NodeJsonRpcHealthChecker {
141+
case class JsonRpcHealthConfig(noUpdateDurationThreshold: Duration, syncingStatusThreshold: Int)
40142

41-
handleResponse(responseF)
143+
object JsonRpcHealthConfig {
144+
def apply(rpcConfig: TypesafeConfig): JsonRpcHealthConfig =
145+
JsonRpcHealthConfig(
146+
noUpdateDurationThreshold = rpcConfig.getDuration("health.no-update-duration-threshold"),
147+
syncingStatusThreshold = rpcConfig.getInt("health.syncing-status-threshold")
148+
)
42149
}
43150
}

src/main/scala/io/iohk/ethereum/jsonrpc/server/controllers/JsonRpcBaseController.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.iohk.ethereum.jsonrpc.serialization.{JsonEncoder, JsonMethodDecoder}
99
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
1010
import io.iohk.ethereum.jsonrpc.server.ipc.JsonRpcIpcServer.JsonRpcIpcServerConfig
1111
import io.iohk.ethereum.jsonrpc.{JsonRpcControllerMetrics, JsonRpcError, JsonRpcRequest, JsonRpcResponse}
12+
import io.iohk.ethereum.jsonrpc.NodeJsonRpcHealthChecker.JsonRpcHealthConfig
1213
import io.iohk.ethereum.utils.Logger
1314
import monix.eval.Task
1415
import org.json4s.JsonDSL._
@@ -120,6 +121,7 @@ object JsonRpcBaseController {
120121
def minerActiveTimeout: FiniteDuration
121122
def httpServerConfig: JsonRpcHttpServerConfig
122123
def ipcServerConfig: JsonRpcIpcServerConfig
124+
def healthConfig: JsonRpcHealthConfig
123125
}
124126

125127
object JsonRpcConfig {
@@ -143,6 +145,7 @@ object JsonRpcBaseController {
143145

144146
override val httpServerConfig: JsonRpcHttpServerConfig = JsonRpcHttpServerConfig(mantisConfig)
145147
override val ipcServerConfig: JsonRpcIpcServerConfig = JsonRpcIpcServerConfig(mantisConfig)
148+
override val healthConfig: JsonRpcHealthConfig = JsonRpcHealthConfig(rpcConfig)
146149
}
147150
}
148151
}

src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,8 +589,19 @@ trait JSONRpcControllerBuilder {
589589
}
590590

591591
trait JSONRpcHealthcheckerBuilder {
592-
this: NetServiceBuilder with EthBlocksServiceBuilder =>
593-
lazy val jsonRpcHealthChecker: JsonRpcHealthChecker = new NodeJsonRpcHealthChecker(netService, ethBlocksService)
592+
this: NetServiceBuilder
593+
with EthBlocksServiceBuilder
594+
with JSONRpcConfigBuilder
595+
with AsyncConfigBuilder
596+
with SyncControllerBuilder =>
597+
lazy val jsonRpcHealthChecker: JsonRpcHealthChecker =
598+
new NodeJsonRpcHealthChecker(
599+
netService,
600+
ethBlocksService,
601+
syncController,
602+
jsonRpcConfig.healthConfig,
603+
asyncConfig
604+
)
594605
}
595606

596607
trait JSONRpcHttpServerBuilder {

src/test/scala/io/iohk/ethereum/jsonrpc/JsonRpcControllerSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class JsonRpcControllerSpec
101101
override def minerActiveTimeout: FiniteDuration = ???
102102
override def httpServerConfig: JsonRpcHttpServer.JsonRpcHttpServerConfig = ???
103103
override def ipcServerConfig: JsonRpcIpcServer.JsonRpcIpcServerConfig = ???
104+
override def healthConfig: NodeJsonRpcHealthChecker.JsonRpcHealthConfig = ???
104105
}
105106

106107
val ethRpcRequest = newJsonRpcRequest("eth_protocolVersion")

0 commit comments

Comments
 (0)