Skip to content

Commit e0c22cf

Browse files
author
Nicolás Tallar
authored
Merge branch 'develop' into fix/checkpointing-metrics
2 parents fce7360 + 07e617c commit e0c22cf

File tree

12 files changed

+104
-34
lines changed

12 files changed

+104
-34
lines changed

src/main/scala/io/iohk/ethereum/faucet/FaucetConfig.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.iohk.ethereum.faucet
22

3-
import com.typesafe.config.{ConfigFactory, Config}
3+
import com.typesafe.config.{Config, ConfigFactory}
44
import io.iohk.ethereum.domain.Address
55

66
import scala.concurrent.duration.{FiniteDuration, _}
@@ -11,36 +11,50 @@ trait FaucetConfigBuilder {
1111
lazy val faucetConfig: FaucetConfig = FaucetConfig(rawConfig)
1212
}
1313

14+
case class RpcClientConfig(
15+
address: String,
16+
timeout: FiniteDuration
17+
)
18+
19+
object RpcClientConfig {
20+
def apply(rpcClientConfig: Config): RpcClientConfig = {
21+
22+
RpcClientConfig(
23+
address = rpcClientConfig.getString("rpc-address"),
24+
timeout = rpcClientConfig.getDuration("timeout").toMillis.millis
25+
)
26+
}
27+
}
28+
1429
case class FaucetConfig(
1530
walletAddress: Address,
1631
walletPassword: String,
1732
txGasPrice: BigInt,
1833
txGasLimit: BigInt,
1934
txValue: BigInt,
20-
rpcAddress: String,
35+
rpcClient: RpcClientConfig,
2136
keyStoreDir: String,
2237
minRequestInterval: FiniteDuration,
2338
handlerTimeout: FiniteDuration,
24-
responseTimeout: FiniteDuration,
39+
actorCommunicationMargin: FiniteDuration,
2540
supervisor: SupervisorConfig,
2641
shutdownTimeout: FiniteDuration
2742
)
2843

2944
object FaucetConfig {
3045
def apply(typesafeConfig: Config): FaucetConfig = {
3146
val faucetConfig = typesafeConfig.getConfig("faucet")
32-
3347
FaucetConfig(
3448
walletAddress = Address(faucetConfig.getString("wallet-address")),
3549
walletPassword = faucetConfig.getString("wallet-password"),
3650
txGasPrice = faucetConfig.getLong("tx-gas-price"),
3751
txGasLimit = faucetConfig.getLong("tx-gas-limit"),
3852
txValue = faucetConfig.getLong("tx-value"),
39-
rpcAddress = faucetConfig.getString("rpc-client.rpc-address"),
53+
rpcClient = RpcClientConfig(faucetConfig.getConfig("rpc-client")),
4054
keyStoreDir = faucetConfig.getString("keystore-dir"),
4155
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis,
4256
handlerTimeout = faucetConfig.getDuration("handler-timeout").toMillis.millis,
43-
responseTimeout = faucetConfig.getDuration("response-timeout").toMillis.millis,
57+
actorCommunicationMargin = faucetConfig.getDuration("actor-communication-margin").toMillis.millis,
4458
supervisor = SupervisorConfig(faucetConfig),
4559
shutdownTimeout = faucetConfig.getDuration("shutdown-timeout").toMillis.millis
4660
)

src/main/scala/io/iohk/ethereum/faucet/SupervisorConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ object SupervisorConfig {
1717

1818
SupervisorConfig(
1919
supervisorConfig.getDuration("min-backoff").toMillis.millis,
20-
supervisorConfig.getDuration("man-backoff").toMillis.millis,
20+
supervisorConfig.getDuration("max-backoff").toMillis.millis,
2121
supervisorConfig.getDouble("random-factor"),
2222
supervisorConfig.getDuration("auto-reset").toMillis.millis,
2323
supervisorConfig.getInt("attempts"),

src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetBuilder.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ trait FaucetRpcServiceBuilder {
3737
)
3838

3939
val walletRpcClient: WalletRpcClient =
40-
new WalletRpcClient(faucetConfig.rpcAddress, () => sslContext("faucet.rpc-client"))
40+
new WalletRpcClient(
41+
faucetConfig.rpcClient.address,
42+
faucetConfig.rpcClient.timeout,
43+
() => sslContext("faucet.rpc-client")
44+
)
4145
val walletService = new WalletService(walletRpcClient, keyStore, faucetConfig)
4246
val faucetSupervisor: FaucetSupervisor = new FaucetSupervisor(walletService, faucetConfig, shutdown)(system)
4347
val faucetRpcService = new FaucetRpcService(faucetConfig)

src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetRpcService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class FaucetRpcService(config: FaucetConfig)(implicit system: ActorSystem)
1616
with FaucetHandlerSelector
1717
with Logger {
1818

19-
implicit lazy val actorTimeout: Timeout = Timeout(config.responseTimeout)
19+
implicit lazy val actorTimeout: Timeout = Timeout(config.actorCommunicationMargin + config.rpcClient.timeout)
2020

2121
def sendFunds(sendFundsRequest: SendFundsRequest): ServiceResponse[SendFundsResponse] =
2222
selectFaucetHandler()

src/main/scala/io/iohk/ethereum/faucet/jsonrpc/WalletRpcClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ import javax.net.ssl.SSLContext
1313
import monix.eval.Task
1414

1515
import scala.concurrent.ExecutionContext
16+
import scala.concurrent.duration.Duration
1617

17-
class WalletRpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLContext])(implicit
18+
class WalletRpcClient(node: Uri, timeout: Duration, getSSLContext: () => Either[SSLError, SSLContext])(implicit
1819
system: ActorSystem,
1920
ec: ExecutionContext
20-
) extends RpcClient(node, getSSLContext)
21+
) extends RpcClient(node, timeout, getSSLContext)
2122
with Logger {
2223
import io.iohk.ethereum.jsonrpc.client.CommonJsonCodecs._
2324

src/main/scala/io/iohk/ethereum/jsonrpc/JsonRpcError.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,31 @@ package io.iohk.ethereum.jsonrpc
22

33
import io.iohk.ethereum.consensus.Protocol
44
import io.iohk.ethereum.jsonrpc.serialization.JsonEncoder
5-
import org.json4s.{JInt, JObject, JString, JValue}
5+
import org.json4s.{JLong, JInt, JObject, JString, JValue}
66

77
case class JsonRpcError(code: Int, message: String, data: Option[JValue])
88

99
// scalastyle:off magic.number
1010
// scalastyle:off public.methods.have.type
11-
object JsonRpcError {
11+
object JsonRpcError extends JsonMethodsImplicits {
12+
1213
def apply[T: JsonEncoder](code: Int, message: String, data: T): JsonRpcError =
1314
JsonRpcError(code, message, Some(JsonEncoder[T].encodeJson(data)))
1415

16+
implicit val rateLimitInformation: JsonEncoder[RateLimitInformation] = (rateLimit: RateLimitInformation) =>
17+
JObject(
18+
"backoff_seconds" -> JLong(rateLimit.backoffSeconds)
19+
)
20+
1521
implicit val jsonRpcErrorEncoder: JsonEncoder[JsonRpcError] = err =>
1622
JObject(
1723
List("code" -> JsonEncoder.encode(err.code), "message" -> JsonEncoder.encode(err.message)) ++
1824
err.data.map("data" -> _)
1925
)
2026

27+
case class RateLimitInformation(backoffSeconds: Long)
28+
def RateLimitError(backoffSeconds: Long) =
29+
JsonRpcError(-32005, "request rate exceeded", RateLimitInformation(backoffSeconds))
2130
val ParseError = JsonRpcError(-32700, "An error occurred on the server while parsing the JSON text", None)
2231
val InvalidRequest = JsonRpcError(-32600, "The JSON sent is not a valid Request object", None)
2332
val MethodNotFound = JsonRpcError(-32601, "The method does not exist / is not available", None)

src/main/scala/io/iohk/ethereum/jsonrpc/client/RpcClient.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import java.util.UUID
55

66
import akka.actor.ActorSystem
77
import akka.http.scaladsl.model._
8+
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
89
import akka.http.scaladsl.unmarshalling.Unmarshal
910
import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
11+
import akka.stream.StreamTcpException
12+
import akka.stream.scaladsl.TcpIdleTimeoutException
1013
import io.circe.generic.auto._
1114
import io.circe.parser.parse
1215
import io.circe.syntax._
@@ -18,8 +21,9 @@ import javax.net.ssl.SSLContext
1821
import monix.eval.Task
1922

2023
import scala.concurrent.ExecutionContext
24+
import scala.concurrent.duration._
2125

22-
abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLContext])(implicit
26+
abstract class RpcClient(node: Uri, timeout: Duration, getSSLContext: () => Either[SSLError, SSLContext])(implicit
2327
system: ActorSystem,
2428
ec: ExecutionContext
2529
) extends Logger {
@@ -32,6 +36,12 @@ abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLCon
3236
Http().defaultClientHttpsContext
3337
}
3438

39+
lazy val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(system)
40+
.withConnectionSettings(
41+
ClientConnectionSettings(system)
42+
.withIdleTimeout(timeout)
43+
)
44+
3545
protected def doRequest[T: Decoder](method: String, args: Seq[Json]): RpcResponse[T] = {
3646
doJsonRequest(method, args).map(_.flatMap(getResult[T]))
3747
}
@@ -60,11 +70,21 @@ abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLCon
6070

6171
Task
6272
.deferFuture(for {
63-
response <- Http().singleRequest(request, connectionContext)
73+
response <- Http().singleRequest(request, connectionContext, connectionPoolSettings)
6474
data <- Unmarshal(response.entity).to[String]
65-
} yield parse(data).left.map(e => RpcClientError(e.message)))
75+
} yield parse(data).left.map(e => ParserError(e.message)))
6676
.onErrorHandle { ex: Throwable =>
67-
Left(RpcClientError(s"RPC request failed: ${exceptionToString(ex)}"))
77+
ex match {
78+
case _: TcpIdleTimeoutException =>
79+
log.error("RPC request", ex)
80+
Left(ConnectionError(s"RPC request timeout"))
81+
case _: StreamTcpException =>
82+
log.error("Connection not established", ex)
83+
Left(ConnectionError(s"Connection not established"))
84+
case _ =>
85+
log.error("RPC request failed", ex)
86+
Left(RpcClientError("RPC request failed"))
87+
}
6888
}
6989
}
7090

@@ -97,5 +117,7 @@ object RpcClient {
97117

98118
case class ParserError(msg: String) extends RpcError
99119

120+
case class ConnectionError(msg: String) extends RpcError
121+
100122
case class RpcClientError(msg: String) extends RpcError
101123
}

src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@ import ch.megard.akka.http.cors.javadsl.CorsRejection
1010
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
1111
import ch.megard.akka.http.cors.scaladsl.model.HttpOriginMatcher
1212
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
13+
import com.typesafe.config.{Config => TypesafeConfig}
1314
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
1415
import io.iohk.ethereum.faucet.jsonrpc.FaucetJsonRpcController
1516
import io.iohk.ethereum.jsonrpc._
16-
import io.iohk.ethereum.security.SSLError
1717
import io.iohk.ethereum.jsonrpc.serialization.JsonSerializers
1818
import io.iohk.ethereum.jsonrpc.server.controllers.JsonRpcBaseController
1919
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
20+
import io.iohk.ethereum.security.SSLError
2021
import io.iohk.ethereum.utils.{ConfigUtils, Logger}
2122
import javax.net.ssl.SSLContext
2223
import monix.eval.Task
2324
import monix.execution.Scheduler.Implicits.global
2425
import org.json4s.{DefaultFormats, JInt, native}
25-
import com.typesafe.config.{Config => TypesafeConfig}
2626

2727
import scala.concurrent.duration.{FiniteDuration, _}
2828

@@ -73,10 +73,14 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
7373
}
7474

7575
def handleRateLimitedRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
76-
if (isBelowRateLimit(clientAddress)) {
77-
log.warn(s"Request limit exceeded for ip ${clientAddress.toIP.getOrElse("unknown")}")
76+
if (isBelowRateLimit(clientAddress))
7877
complete(jsonRpcController.handleRequest(request).runToFuture)
79-
} else complete(StatusCodes.TooManyRequests)
78+
else {
79+
log.warn(s"Request limit exceeded for ip ${clientAddress.toIP.getOrElse("unknown")}")
80+
complete(
81+
(StatusCodes.TooManyRequests, JsonRpcError.RateLimitError(config.rateLimit.minRequestInterval.toSeconds))
82+
)
83+
}
8084
}
8185

8286
/**
@@ -113,7 +117,7 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
113117
.traverse(requests)(request => jsonRpcController.handleRequest(request))
114118
.runToFuture
115119
}
116-
} else complete(StatusCodes.MethodNotAllowed)
120+
} else complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
117121
}
118122
}
119123

src/test/resources/application.conf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,18 @@ faucet {
156156
rpc-client {
157157
rpc-address = "http://127.0.0.1:8546/"
158158
certificate = null
159+
timeout = 2.seconds
159160
}
160161

161162
min-request-interval = 1.minute
162163

163164
handler-timeout = 2.seconds
164165

165-
response-timeout = 2.seconds
166+
actor-communication-margin = 1.seconds
166167

167168
supervisor {
168169
min-backoff = 3.seconds
169-
man-backoff = 30.seconds
170+
max-backoff = 30.seconds
170171
random-factor = 0.2
171172
auto-reset = 10.seconds
172173
attempts = 4

src/test/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetRpcServiceSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.iohk.ethereum.faucet.FaucetHandler.FaucetHandlerResponse.{
1313
}
1414
import io.iohk.ethereum.faucet.FaucetStatus.WalletAvailable
1515
import io.iohk.ethereum.faucet.jsonrpc.FaucetDomain.{SendFundsRequest, StatusRequest}
16-
import io.iohk.ethereum.faucet.{FaucetConfig, SupervisorConfig}
16+
import io.iohk.ethereum.faucet.{FaucetConfig, RpcClientConfig, SupervisorConfig}
1717
import io.iohk.ethereum.jsonrpc.JsonRpcError
1818
import io.iohk.ethereum.testing.ActorsTesting.simpleAutoPilot
1919
import io.iohk.ethereum.{NormalPatience, WithActorSystemShutDown}
@@ -133,11 +133,11 @@ class FaucetRpcServiceSpec
133133
txGasPrice = 10,
134134
txGasLimit = 20,
135135
txValue = 1,
136-
rpcAddress = "",
136+
rpcClient = RpcClientConfig(address = "", timeout = 10.seconds),
137137
keyStoreDir = "",
138138
minRequestInterval = 10.seconds,
139139
handlerTimeout = 10.seconds,
140-
responseTimeout = 10.seconds,
140+
actorCommunicationMargin = 10.seconds,
141141
supervisor = mock[SupervisorConfig],
142142
shutdownTimeout = 15.seconds
143143
)

src/test/scala/io/iohk/ethereum/faucet/jsonrpc/WalletServiceSpec.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import java.security.SecureRandom
55
import akka.util.ByteString
66
import io.iohk.ethereum.crypto._
77
import io.iohk.ethereum.domain.{Address, Transaction}
8-
import io.iohk.ethereum.faucet.{FaucetConfig, SupervisorConfig}
8+
import io.iohk.ethereum.faucet.{FaucetConfig, RpcClientConfig, SupervisorConfig}
9+
import io.iohk.ethereum.jsonrpc.client.RpcClient.ConnectionError
910
import io.iohk.ethereum.keystore.KeyStore.DecryptionFailed
1011
import io.iohk.ethereum.keystore.{KeyStore, Wallet}
1112
import io.iohk.ethereum.network.p2p.messages.CommonMessages.SignedTransactions.SignedTransactionEnc
@@ -21,7 +22,7 @@ import scala.concurrent.duration._
2122

2223
class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {
2324

24-
"Wallet Service" should "send a transaction" in new TestSetup {
25+
"Wallet Service" should "send a transaction successfully when getNonce and sendTransaction successfully" in new TestSetup {
2526

2627
val receivingAddress = Address("0x99")
2728
val currentNonce = 2
@@ -44,6 +45,17 @@ class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {
4445

4546
}
4647

48+
it should "failure the transaction when get timeout of getNonce" in new TestSetup {
49+
50+
val timeout = ConnectionError("timeout")
51+
(walletRpcClient.getNonce _).expects(config.walletAddress).returning(Task.pure(Left(timeout)))
52+
53+
val res = walletService.sendFunds(wallet, Address("0x99")).runSyncUnsafe()
54+
55+
res shouldEqual Left(timeout)
56+
57+
}
58+
4759
it should "get wallet successful" in new TestSetup {
4860
(mockKeyStore.unlockAccount _).expects(config.walletAddress, config.walletPassword).returning(Right(wallet))
4961

@@ -76,11 +88,11 @@ class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {
7688
txGasPrice = 10,
7789
txGasLimit = 20,
7890
txValue = 1,
79-
rpcAddress = "",
91+
rpcClient = RpcClientConfig("", timeout = 10.seconds),
8092
keyStoreDir = "",
8193
minRequestInterval = 10.seconds,
8294
handlerTimeout = 10.seconds,
83-
responseTimeout = 10.seconds,
95+
actorCommunicationMargin = 10.seconds,
8496
supervisor = mock[SupervisorConfig],
8597
shutdownTimeout = 15.seconds
8698
)

src/universal/conf/faucet.conf

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ faucet {
4040
# null value indicates HTTPS is not being used
4141
# password-file = "tls/password"
4242
#}
43+
44+
# Response time-out from rpc client resolve
45+
timeout = 3.seconds
4346
}
4447

4548
# How often can a single IP address send a request
@@ -49,12 +52,12 @@ faucet {
4952
handler-timeout = 1.seconds
5053

5154
# Response time-out from actor resolve
52-
response-timeout = 3.seconds
55+
actor-communication-margin = 1.seconds
5356

5457
# Supervisor with BackoffSupervisor pattern
5558
supervisor {
5659
min-backoff = 3.seconds
57-
man-backoff = 30.seconds
60+
max-backoff = 30.seconds
5861
random-factor = 0.2
5962
auto-reset = 10.seconds
6063
attempts = 4

0 commit comments

Comments
 (0)